Merge branch 'master' into mvcc_prototype

This commit is contained in:
Alexander Tokmakov 2022-03-10 13:13:04 +01:00
commit 061fa6a6f2
85 changed files with 1040 additions and 291 deletions

View File

@ -1,11 +1,11 @@
sudo apt-get install apt-transport-https ca-certificates dirmngr
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
echo "deb https://packages.clickhouse.com/deb stable main/" | sudo tee \
echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
sudo service clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
clickhouse-client # or "clickhouse-client --password" if you've set up a password.

View File

@ -14,7 +14,6 @@ module.exports = {
entry: [
path.resolve(scssPath, 'bootstrap.scss'),
path.resolve(scssPath, 'greenhouse.scss'),
path.resolve(scssPath, 'main.scss'),
path.resolve(jsPath, 'main.js'),
],

View File

@ -151,6 +151,11 @@ def build_website(args):
)
)
shutil.copytree(
os.path.join(args.website_dir, 'images'),
os.path.join(args.output_dir, 'docs', 'images')
)
# This file can be requested to check for available ClickHouse releases.
shutil.copy2(
os.path.join(args.src_dir, 'utils', 'list-versions', 'version_date.tsv'),
@ -231,28 +236,31 @@ def minify_file(path, css_digest, js_digest):
def minify_website(args):
# Output greenhouse css separately from main bundle to be included via the greenhouse iframe
command = f"cat '{args.website_dir}/css/greenhouse.css' > '{args.output_dir}/css/greenhouse.css'"
logging.info(command)
output = subprocess.check_output(command, shell=True)
logging.debug(output)
css_in = ' '.join(get_css_in(args))
css_out = f'{args.output_dir}/css/base.css'
if args.minify:
css_out = f'{args.output_dir}/docs/css/base.css'
os.makedirs(f'{args.output_dir}/docs/css')
if args.minify and False: # TODO: return closure
command = f"purifycss -w '*algolia*' --min {css_in} '{args.output_dir}/*.html' " \
f"'{args.output_dir}/docs/en/**/*.html' '{args.website_dir}/js/**/*.js' > {css_out}"
else:
command = f'cat {css_in} > {css_out}'
logging.info(css_in)
logging.info(command)
output = subprocess.check_output(command, shell=True)
logging.debug(output)
else:
command = f"cat {css_in}"
output = subprocess.check_output(command, shell=True)
with open(css_out, 'wb+') as f:
f.write(output)
logging.info(command)
output = subprocess.check_output(command, shell=True)
logging.debug(output)
with open(css_out, 'rb') as f:
css_digest = hashlib.sha3_224(f.read()).hexdigest()[0:8]
js_in = get_js_in(args)
js_out = f'{args.output_dir}/js/base.js'
js_in = ' '.join(get_js_in(args))
js_out = f'{args.output_dir}/docs/js/base.js'
os.makedirs(f'{args.output_dir}/docs/js')
if args.minify and False: # TODO: return closure
js_in = [js[1:-1] for js in js_in]
closure_args = [
@ -271,11 +279,11 @@ def minify_website(args):
f.write(js_content)
else:
js_in = ' '.join(js_in)
command = f'cat {js_in} > {js_out}'
logging.info(command)
command = f"cat {js_in}"
output = subprocess.check_output(command, shell=True)
logging.debug(output)
with open(js_out, 'wb+') as f:
f.write(output)
with open(js_out, 'rb') as f:
js_digest = hashlib.sha3_224(f.read()).hexdigest()[0:8]
logging.info(js_digest)

View File

@ -1 +0,0 @@
../../../../en/sql-reference/statements/alter/settings-profile.md

View File

@ -0,0 +1,16 @@
---
toc_priority: 48
toc_title: 配置文件设置
---
## 更改配置文件设置 {#alter-settings-profile-statement}
更改配置文件设置。
语法:
``` sql
ALTER SETTINGS PROFILE [IF EXISTS] TO name1 [ON CLUSTER cluster_name1] [RENAME TO new_name1]
[, name2 [ON CLUSTER cluster_name2] [RENAME TO new_name2] ...]
[SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [READONLY|WRITABLE] | INHERIT 'profile_name'] [,...]
```

View File

@ -435,6 +435,8 @@ private:
Progress progress;
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
executor.sendQuery(ClientInfo::QueryKind::INITIAL_QUERY);
ProfileInfo info;
while (Block block = executor.read())
info.update(block);

View File

@ -144,7 +144,6 @@ list (APPEND dbms_sources
AggregateFunctions/AggregateFunctionState.cpp
AggregateFunctions/AggregateFunctionCount.cpp
AggregateFunctions/parseAggregateFunctionParameters.cpp)
list (APPEND dbms_headers
AggregateFunctions/IAggregateFunction.h
AggregateFunctions/IAggregateFunctionCombinator.h
@ -155,10 +154,25 @@ list (APPEND dbms_headers
AggregateFunctions/FactoryHelpers.h
AggregateFunctions/parseAggregateFunctionParameters.h)
list (APPEND dbms_sources TableFunctions/ITableFunction.cpp TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_headers TableFunctions/ITableFunction.h TableFunctions/TableFunctionFactory.h)
list (APPEND dbms_sources Dictionaries/DictionaryFactory.cpp Dictionaries/DictionarySourceFactory.cpp Dictionaries/DictionaryStructure.cpp Dictionaries/getDictionaryConfigurationFromAST.cpp)
list (APPEND dbms_headers Dictionaries/DictionaryFactory.h Dictionaries/DictionarySourceFactory.h Dictionaries/DictionaryStructure.h Dictionaries/getDictionaryConfigurationFromAST.h)
list (APPEND dbms_sources
TableFunctions/ITableFunction.cpp
TableFunctions/TableFunctionView.cpp
TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_headers
TableFunctions/ITableFunction.h
TableFunctions/TableFunctionView.h
TableFunctions/TableFunctionFactory.h)
list (APPEND dbms_sources
Dictionaries/DictionaryFactory.cpp
Dictionaries/DictionarySourceFactory.cpp
Dictionaries/DictionaryStructure.cpp
Dictionaries/getDictionaryConfigurationFromAST.cpp)
list (APPEND dbms_headers
Dictionaries/DictionaryFactory.h
Dictionaries/DictionarySourceFactory.h
Dictionaries/DictionaryStructure.h
Dictionaries/getDictionaryConfigurationFromAST.h)
if (NOT ENABLE_SSL)
list (REMOVE_ITEM clickhouse_common_io_sources Common/OpenSSLHelpers.cpp)
@ -253,18 +267,16 @@ if (TARGET ch_contrib::nuraft)
add_object_library(clickhouse_coordination Coordination)
endif()
set (DBMS_COMMON_LIBRARIES)
if (USE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_library (dbms STATIC ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PRIVATE ch_contrib::libdivide ${DBMS_COMMON_LIBRARIES})
target_link_libraries (dbms PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (dbms PRIVATE ch_contrib::jemalloc)
endif()
set (all_modules dbms)
else()
add_library (dbms SHARED ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PUBLIC ${all_modules} ${DBMS_COMMON_LIBRARIES})
target_link_libraries (dbms PUBLIC ${all_modules})
target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::jemalloc)
@ -557,6 +569,10 @@ if (ENABLE_TESTS)
clickhouse_common_zookeeper
string_utils)
if (TARGET ch_contrib::yaml_cpp)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::yaml_cpp)
endif()
add_check(unit_tests_dbms)
endif ()

View File

@ -133,7 +133,12 @@ void MultiplexedConnections::sendQuery(
modified_settings.group_by_two_level_threshold_bytes = 0;
}
if (settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas)
bool parallel_reading_from_replicas = settings.max_parallel_replicas > 1
&& settings.allow_experimental_parallel_reading_from_replicas
/// To avoid trying to coordinate with clickhouse-benchmark,
/// since it uses the same code.
&& client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY;
if (parallel_reading_from_replicas)
{
client_info.collaborate_with_initiator = true;
client_info.count_participating_replicas = replica_info.all_replicas_count;

View File

@ -3,13 +3,9 @@
#if USE_YAML_CPP
#include "YAMLParser.h"
#include <string>
#include <cstring>
#include <vector>
#include <Poco/DOM/Document.h>
#include <Poco/DOM/DOMParser.h>
#include <Poco/DOM/DOMWriter.h>
#include <Poco/DOM/NodeList.h>
#include <Poco/DOM/Element.h>
#include <Poco/DOM/AutoPtr.h>
@ -19,8 +15,6 @@
#include <yaml-cpp/yaml.h>
#include <base/logger_useful.h>
using namespace Poco::XML;
namespace DB
@ -74,30 +68,35 @@ void processNode(const YAML::Node & node, Poco::XML::Element & parent_xml_elemen
case YAML::NodeType::Sequence:
{
for (const auto & child_node : node)
if (parent_xml_element.hasChildNodes())
/// For sequences it depends how we want to process them.
/// Sequences of key-value pairs such as:
/// seq:
/// - k1: val1
/// - k2: val2
/// into xml like this:
/// <seq>
/// <k1>val1</k1>
/// <k2>val2</k2>
/// </seq>
///
/// But, if the sequence is just a list, the root-node needs to be repeated, such as:
/// seq:
/// - val1
/// - val2
/// into xml like this:
/// <seq>val1</seq>
/// <seq>val2</seq>
///
/// Therefore check what type the child is, for further processing.
/// Mixing types (values list or map) will lead to strange results but should not happen.
if (parent_xml_element.hasChildNodes() && !child_node.IsMap())
{
/// We want to process sequences like that:
/// seq:
/// - val1
/// - k2: val2
/// - val3
/// - k4: val4
/// - val5
/// into xml like this:
/// <seq>val1</seq>
/// <seq>
/// <k2>val2</k2>
/// </seq>
/// <seq>val3</seq>
/// <seq>
/// <k4>val4</k4>
/// </seq>
/// <seq>val5</seq>
/// So, we create a new parent node with same tag for each child node
/// Create a new parent node with same tag for each child node
processNode(child_node, *createCloneNode(parent_xml_element));
}
else
{
/// Map, so don't recreate the parent node but add directly
processNode(child_node, parent_xml_element);
}
break;

View File

@ -9,6 +9,7 @@
#include <boost/noncopyable.hpp>
#include <base/strong_typedef.h>
#include <base/getPageSize.h>
#include <Common/Allocator.h>
#include <Common/Exception.h>
@ -196,7 +197,7 @@ protected:
/// The operation is slow and performed only for debug builds.
void protectImpl(int prot)
{
static constexpr size_t PROTECT_PAGE_SIZE = 4096;
static size_t PROTECT_PAGE_SIZE = ::getPageSize();
char * left_rounded_up = reinterpret_cast<char *>((reinterpret_cast<intptr_t>(c_start) - pad_left + PROTECT_PAGE_SIZE - 1) / PROTECT_PAGE_SIZE * PROTECT_PAGE_SIZE);
char * right_rounded_down = reinterpret_cast<char *>((reinterpret_cast<intptr_t>(c_end_of_storage) + pad_right) / PROTECT_PAGE_SIZE * PROTECT_PAGE_SIZE);

View File

@ -0,0 +1,75 @@
#pragma once
#include <filesystem>
#include <Common/filesystemHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/Config/ConfigProcessor.h>
#include <Poco/AutoPtr.h>
#include "Poco/DOM/Document.h"
#include "Poco/DOM/NodeList.h"
#include "Poco/DOM/NamedNodeMap.h"
const std::string tmp_path = "/tmp/";
inline std::unique_ptr<Poco::File> getFileWithContents(const char *fileName, const char *fileContents)
{
using namespace DB;
namespace fs = std::filesystem;
using File = Poco::File;
fs::create_directories(fs::path(tmp_path));
auto config_file = std::make_unique<File>(tmp_path + fileName);
{
WriteBufferFromFile out(config_file->path());
writeString(fileContents, out);
}
return config_file;
}
inline std::string xmlNodeAsString(Poco::XML::Node *pNode)
{
const auto& node_name = pNode->nodeName();
Poco::XML::XMLString result = "<" + node_name ;
auto *attributes = pNode->attributes();
for (auto i = 0; i<attributes->length();i++)
{
auto *item = attributes->item(i);
auto name = item->nodeName();
auto text = item->innerText();
result += (" " + name + "=\"" + text + "\"");
}
result += ">";
if (pNode->hasChildNodes() && pNode->firstChild()->nodeType() != Poco::XML::Node::TEXT_NODE)
{
result += "\n";
}
attributes->release();
auto *list = pNode->childNodes();
for (auto i = 0; i<list->length();i++)
{
auto *item = list->item(i);
auto type = item->nodeType();
if (type == Poco::XML::Node::ELEMENT_NODE)
{
result += xmlNodeAsString(item);
}
else if (type == Poco::XML::Node::TEXT_NODE)
{
result += item->innerText();
}
}
list->release();
result += ("</"+ node_name + ">\n");
return Poco::XML::fromXMLString(result);
}

View File

@ -0,0 +1,78 @@
#include <Common/config.h>
#if USE_YAML_CPP
#include "gtest_helper_functions.h"
#include <base/scope_guard.h>
#include <Common/Config/YAMLParser.h>
#include <Common/Config/ConfigHelper.h>
#include <Poco/AutoPtr.h>
#include "Poco/DOM/Document.h"
#include <gtest/gtest.h>
using namespace DB;
TEST(Common, YamlParserInvalidFile)
{
ASSERT_THROW(YAMLParser::parse("some-non-existing-file.yaml"), Exception);
}
TEST(Common, YamlParserProcessKeysList)
{
auto yaml_file = getFileWithContents("keys-list.yaml", R"YAML(
operator:
access_management: "1"
networks:
- ip: "10.1.6.168"
- ip: "::1"
- ip: "127.0.0.1"
)YAML");
SCOPE_EXIT({ yaml_file->remove(); });
Poco::AutoPtr<Poco::XML::Document> xml = YAMLParser::parse(yaml_file->path());
auto *p_node = xml->getNodeByPath("/clickhouse");
EXPECT_EQ(xmlNodeAsString(p_node), R"CONFIG(<clickhouse>
<operator>
<access_management>1</access_management>
<networks>
<ip>10.1.6.168</ip>
<ip>::1</ip>
<ip>127.0.0.1</ip>
</networks>
</operator>
</clickhouse>
)CONFIG");
}
TEST(Common, YamlParserProcessValuesList)
{
auto yaml_file = getFileWithContents("values-list.yaml", R"YAML(
rules:
- apiGroups: [""]
resources:
- nodes
- nodes/proxy
- services
- endpoints
- pods
)YAML");
SCOPE_EXIT({ yaml_file->remove(); });
Poco::AutoPtr<Poco::XML::Document> xml = YAMLParser::parse(yaml_file->path());
auto *p_node = xml->getNodeByPath("/clickhouse");
EXPECT_EQ(xmlNodeAsString(p_node), R"CONFIG(<clickhouse>
<rules>
<apiGroups></apiGroups>
<resources>nodes</resources>
<resources>nodes/proxy</resources>
<resources>services</resources>
<resources>endpoints</resources>
<resources>pods</resources>
</rules>
</clickhouse>
)CONFIG");
}
#endif

View File

@ -3,6 +3,8 @@
#include <Coordination/Defines.h>
#include <Common/Exception.h>
#include <filesystem>
#include <Common/isLocalAddress.h>
#include <Common/DNSResolver.h>
namespace DB
{
@ -12,6 +14,70 @@ namespace ErrorCodes
extern const int RAFT_ERROR;
}
namespace
{
bool isLoopback(const std::string & hostname)
{
try
{
return DNSResolver::instance().resolveHost(hostname).isLoopback();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return false;
}
bool isLocalhost(const std::string & hostname)
{
try
{
return isLocalAddress(DNSResolver::instance().resolveHost(hostname));
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return false;
}
std::unordered_map<UInt64, std::string> getClientPorts(const Poco::Util::AbstractConfiguration & config)
{
static const char * config_port_names[] = {
"keeper_server.tcp_port",
"keeper_server.tcp_port_secure",
"interserver_http_port",
"interserver_https_port",
"tcp_port",
"tcp_with_proxy_port",
"tcp_port_secure",
"mysql_port",
"postgresql_port",
"grpc_port",
"prometheus.port",
};
std::unordered_map<UInt64, std::string> ports;
for (const auto & config_port_name : config_port_names)
{
if (config.has(config_port_name))
ports[config.getUInt64(config_port_name)] = config_port_name;
}
return ports;
}
}
/// this function quite long because contains a lot of sanity checks in config:
/// 1. No duplicate endpoints
/// 2. No "localhost" or "127.0.0.1" or another local addresses mixed with normal addresses
/// 3. Raft internal port is not equal to any other port for client
/// 4. No duplicate IDs
/// 5. Our ID present in hostnames list
KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const
{
KeeperConfigurationWrapper result;
@ -19,12 +85,17 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix + ".raft_configuration", keys);
auto client_ports = getClientPorts(config);
/// Sometimes (especially in cloud envs) users can provide incorrect
/// configuration with duplicated raft ids or endpoints. We check them
/// on config parsing stage and never commit to quorum.
std::unordered_map<std::string, int> check_duplicated_hostnames;
size_t total_servers = 0;
std::string loopback_hostname;
std::string non_local_hostname;
size_t local_address_counter = 0;
for (const auto & server_key : keys)
{
if (!startsWith(server_key, "server"))
@ -38,13 +109,33 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
int32_t priority = config.getInt(full_prefix + ".priority", 1);
bool start_as_follower = config.getBool(full_prefix + ".start_as_follower", false);
if (client_ports.count(port) != 0)
{
throw Exception(ErrorCodes::RAFT_ERROR, "Raft configuration contains hostname '{}' with port '{}' which is equal to '{}' in server configuration",
hostname, port, client_ports[port]);
}
if (isLoopback(hostname))
{
loopback_hostname = hostname;
local_address_counter++;
}
else if (isLocalhost(hostname))
{
local_address_counter++;
}
else
{
non_local_hostname = hostname;
}
if (start_as_follower)
result.servers_start_as_followers.insert(new_server_id);
auto endpoint = hostname + ":" + std::to_string(port);
if (check_duplicated_hostnames.count(endpoint))
{
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contain duplicate endpoints: "
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contains duplicate endpoints: "
"endpoint {} has been already added with id {}, but going to add it one more time with id {}",
endpoint, check_duplicated_hostnames[endpoint], new_server_id);
}
@ -54,7 +145,7 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
for (const auto & [id_endpoint, id] : check_duplicated_hostnames)
{
if (new_server_id == id)
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contain duplicate ids: id {} has been already added with endpoint {}, "
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contains duplicate ids: id {} has been already added with endpoint {}, "
"but going to add it one more time with endpoint {}", id, id_endpoint, endpoint);
}
check_duplicated_hostnames.emplace(endpoint, new_server_id);
@ -77,6 +168,24 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
if (result.servers_start_as_followers.size() == total_servers)
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
if (!loopback_hostname.empty() && !non_local_hostname.empty())
{
throw Exception(
ErrorCodes::RAFT_ERROR,
"Mixing loopback and non-local hostnames ('{}' and '{}') in raft_configuration is not allowed. "
"Different hosts can resolve it to themselves so it's not allowed.",
loopback_hostname, non_local_hostname);
}
if (!non_local_hostname.empty() && local_address_counter > 1)
{
throw Exception(
ErrorCodes::RAFT_ERROR,
"Local address specified more than once ({} times) and non-local hostnames also exists ('{}') in raft_configuration. "
"Such configuration is not allowed because single host can vote multiple times.",
local_address_counter, non_local_hostname);
}
return result;
}

View File

@ -30,7 +30,7 @@ namespace ErrorCodes
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table",
"update_field", "update_tag", "invalidate_query", "query", "where", "name", "secure"};
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "secure"};
namespace
{

View File

@ -34,7 +34,7 @@ static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password",
"db", "database", "table", "schema",
"update_field", "invalidate_query", "priority",
"update_tag", "dont_check_update_time",
"update_lag", "dont_check_update_time",
"query", "where", "name" /* name_collection */, "socket",
"share_connection", "fail_on_connection_loss", "close_connection",
"ssl_ca", "ssl_cert", "ssl_key",

View File

@ -30,7 +30,7 @@ static const UInt64 max_block_size = 8192;
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table", "schema",
"update_field", "update_tag", "invalidate_query", "query", "where", "name", "priority"};
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
namespace
{

View File

@ -6,6 +6,7 @@
#include <Interpreters/Context.h>
#include <Common/filesystemHelpers.h>
#include <Common/quoteString.h>
#include <Common/renameat2.h>
#include <IO/createReadBufferFromFileBase.h>
#include <fstream>
@ -325,7 +326,7 @@ DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
void DiskLocal::moveFile(const String & from_path, const String & to_path)
{
fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
}
void DiskLocal::replaceFile(const String & from_path, const String & to_path)

View File

@ -286,7 +286,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size /*, std::move(schedule) */);
buf_size, std::move(schedule));
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{

View File

@ -267,7 +267,7 @@ public:
*/
virtual Monotonicity getMonotonicityForRange(const IDataType & /*type*/, const Field & /*left*/, const Field & /*right*/) const
{
throw Exception("Function " + getName() + " has no information about its monotonicity.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Function " + getName() + " has no information about its monotonicity", ErrorCodes::NOT_IMPLEMENTED);
}
};
@ -452,7 +452,7 @@ public:
using Monotonicity = IFunctionBase::Monotonicity;
virtual Monotonicity getMonotonicityForRange(const IDataType & /*type*/, const Field & /*left*/, const Field & /*right*/) const
{
throw Exception("Function " + getName() + " has no information about its monotonicity.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Function " + getName() + " has no information about its monotonicity", ErrorCodes::NOT_IMPLEMENTED);
}
/// For non-variadic functions, return number of arguments; otherwise return zero (that should be ignored).

View File

@ -11,7 +11,6 @@
#include "HadoopSnappyReadBuffer.h"
namespace DB
{
namespace ErrorCodes
@ -32,11 +31,11 @@ inline bool HadoopSnappyDecoder::checkAvailIn(size_t avail_in, int min)
inline void HadoopSnappyDecoder::copyToBuffer(size_t * avail_in, const char ** next_in)
{
assert(*avail_in <= sizeof(buffer));
assert(*avail_in + buffer_length <= sizeof(buffer));
memcpy(buffer, *next_in, *avail_in);
memcpy(buffer + buffer_length, *next_in, *avail_in);
buffer_length = *avail_in;
buffer_length += *avail_in;
*next_in += *avail_in;
*avail_in = 0;
}
@ -78,14 +77,21 @@ inline HadoopSnappyDecoder::Status HadoopSnappyDecoder::readLength(size_t * avai
inline HadoopSnappyDecoder::Status HadoopSnappyDecoder::readBlockLength(size_t * avail_in, const char ** next_in)
{
if (block_length < 0)
{
return readLength(avail_in, next_in, &block_length);
}
return Status::OK;
}
inline HadoopSnappyDecoder::Status HadoopSnappyDecoder::readCompressedLength(size_t * avail_in, const char ** next_in)
{
if (compressed_length < 0)
return readLength(avail_in, next_in, &compressed_length);
{
auto status = readLength(avail_in, next_in, &compressed_length);
if (unlikely(compressed_length > 0 && static_cast<size_t>(compressed_length) > sizeof(buffer)))
throw Exception(ErrorCodes::SNAPPY_UNCOMPRESS_FAILED, "Too large snappy compressed block. buffer size: {}, compressed block size: {}", sizeof(buffer), compressed_length);
return status;
}
return Status::OK;
}
@ -111,7 +117,6 @@ HadoopSnappyDecoder::readCompressedData(size_t * avail_in, const char ** next_in
{
compressed = const_cast<char *>(*next_in);
}
size_t uncompressed_length = *avail_out;
auto status = snappy_uncompress(compressed, compressed_length, *next_out, &uncompressed_length);
if (status != SNAPPY_OK)
@ -154,7 +159,9 @@ HadoopSnappyDecoder::Status HadoopSnappyDecoder::readBlock(size_t * avail_in, co
return status;
}
if (total_uncompressed_length != block_length)
{
return Status::INVALID_INPUT;
}
return Status::OK;
}

View File

@ -38,6 +38,11 @@ int main()
return 1;
}
}
if (uncompress(256) != output)
{
std::cout << "test hadoop snappy read buffer failed, buf_size:" << 256 << std::endl;
return 1;
}
std::cout << "test hadoop snappy read buffer success" << std::endl;
return 0;
}

View File

@ -0,0 +1,66 @@
#include <snappy.h>
#include <IO/HadoopSnappyReadBuffer.h>
#include <gtest/gtest.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Poco/Base64Decoder.h>
#include <Poco/Base64Encoder.h>
#include <Poco/MemoryStream.h>
#include <Poco/StreamCopier.h>
#include <Poco/String.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
using namespace DB;
TEST(HadoopSnappyDecoder, repeatNeedMoreInput)
{
String snappy_base64_content = "AAAl6gAAB67qSzQyMDIxLTA2LTAxAXh4eGIEACQzMTkuNzQyNDMKnjEAHDQyLjgyMTcynjEAIDI5Ni4yODQwNqIxA"
"BgyNy43MjYzqpMAGDMuNTIyMzSiYgAcNjUuNDk1OTeiMQAYOTQuNTg1NaYxABg4OC40NzgyojEAHDMyMS4zOTE1os"
"QAHDM0Ni4xNTI3qjEAGDEuMjA3MTWm9QAQMi41MjamYQAcMjIuNTEyNDieYQAcMzMwLjI5MTKiGgIcMzIzLjAzNDi"
"iwwAcMzE1LjA1MDmiYgAcNDM1Ljc2ODaqxAAUMS45NDA5nvQACDAuMP4rAEorABwzMDMuMjAyNaYZARgwOC4xOTEy"
"pugAGDQ2LjQ0MjKilQMcMjc4Ljk3MTiiMQAcMzUwLjc3NTeirAGqSwEcMzI5LjkyMzGiXAAcMzMxLjc2NzamwAMUM"
"TMuNjM4pjEAGDI3NC4yMzK2MQAINDg0qrMBFDExLjgzNqbbBRgyNDkuNTI5qtsFGDUwLjE4ODmi5AGlSAgwNjWmiA"
"EUMjIuNjU4pqcCBDUzYcCqdgIYMDEuMzcxNbbPBgQ5Na5TBBA0Ljc1OaIiBMGdDDM0OTGeJwQcMjg3LjIyNTmm/AM"
"hVAAyopAAGDMxOC4wMjGmMAAB8AQ0OKpGAhgyMC42MTM4poMBFDg3LjEzOKoxABA5My4xNaZSARQ5NS41ODemTgVh"
"OQwwODg2osIAGDMyNi45NTSmMQAcMjc3LjgxNDmqjQcMNS42MqpqA0F3DDg2MDamzAPhKwQ4OKJWARgzMDYuMTc1q"
"i0EGDgwLjIwNTSihAUYMjk3LjY5NaYiBRAyOTAuM6aNBBgyMzkuMzI5pkIJwdOi7wcYMzcxLjIyNqpiBxQ0NS44Nz"
"Gq9woEODAOZAoANqJ+BRgyNzYuMjExpnYCIYIMMjIyOKKnAmVrBDc0psQAEDMwOS4xqtEJGDMwNC45MzSq8wAMNC4"
"0OKomCyG3DDE4MTGi/AMhJAQxMKqjBhgyNjEuNDQ4rqMGFDIuOTEwN6I5AwQzN7JMCQw2LjcwqqoMGDI2MC44NzOm"
"dwIOTAkMNDgzMqLSBhQyNTkuMjGmYweBiwg3MzOmyQMYNDM3Ljg1N6ZyBq5QARQzMy43MjSqKw4UMTIuNzkxpkkFD"
"mgNDDc4MzCmUAEUOTUuOTQypnoFDiQIDDI2ODmmBQMUNTEuMjc2qikEDtkJBDA1qgUDFDA3LjE1N6ZiAOGUCDMwOa"
"oxABA3NC42NqqmAhA5Ni45N6rIAxwzMDcuMjkzMaL+ChQyNzUuODau/QoANOExpugBGDI0Ny4xODSm5wEYOTEuNDE"
"3MZ7MChQzMzUuNjWquQQUNTMuODg1psMHDu8SCDIyOaYJDoFbCDk4M6aWDhwzNDEuNTcyMKK1AUF4ADSqCwoQMzg1"
"LjSujBIB9Aw0MDUwotoJDi4PCDc0N6aHARgyMjMuODMxpgYRwmcRGDIxMi4xNjWqSgIQMDkuODmuzgMYMTkuNTg0M"
"aK7CMFFADmuZQcQMDYuMzKqXwAIOS4zrl8ADu4PBDQ0qtQUGDQ3LjAzODGmFwIYMTAuOTIwMKLDAKG0DDM3MDOiYg"
"CqNgcORgkEMzeuGwWqXQAhqwg2MDWmSQUYMjY0LjE2N6aZFBIgFgQyM6aiCRQwNi41NTSm9AcYMjczLjczNqqSABg"
"0NS45OTIzpugPFDIxLjc3MqZ4EBwyODYuMDkyNKZAAhg0OS4yMjQzom8GDu0LCDEwNKaTBwAzDiUIADimGQkUMzM4"
"Ljc2qlITADcOmBUAOaYNBhwyNzAuODA4N6qrCQw3LjAwppkYwT4IMjYzrg0GDDMuOTmq/xEQMjIuODOqRgkEMjQOX"
"xKmQA0IMzAwDggVqjwREDY1LjYxsh8aCDQuOKrCBxgyNTQuNjQ2phMUISQENzmqsAwOLgsENTWqeAIQOTEuNTiuzR"
"EANw55CQAwpp8GEDI2My44rgsRFDI0LjMxNqZuBhIrFgAxqswDGDI4OS4zMzCqXwQANoHyADCmbAMUMzI4LjM2pps"
"DDDY1LjKBj57+Cg5PFwQ1NaoVBmFrADaqwgccMjk5LjgxMTCqdwYQMy4wODKmZwcEMzIOqBQAMaaCBRgyMjUuMTE2"
"qtkJADEOLw8AMKYwBBgyMzAuMTQyprwPGDMwMi4wMjemiAEOzQ4MODA0M6YaAhA1NC4yNKYkBWEMDsELqmEAFDIuN"
"jE4N6LNBxgyODMuNTM1qqUfFDk5Ljc4NKaaGQ5UEAgyNjSuqw2usgkYNDMuMDY0MZ5rAyHkCDMzOa6sHg6+CwAwpn"
"YGDnseCDk1MqaoAsHYDDgzNjeiLgsYMjg1LjkzMqZ1EQ67IQgyNTmmMQBB2Qg0OTamuhMUMjcxLjkzqpMWBDMyDoo"
"hADSmYgChhAg2NjimeAIQMzkxLjiqyw4IOTkuDt8bpoYBDDk0LjamaQMO4hAIOTI3qqQYFDQyLjk1M6oxAAQ4NA7G"
"HaZKIg6YCwwxNzYzpiQXFDkwLjk0OKqqAhQ5Ny4yNzSmvwQANg54GKq/CA4AIQg1MzOm/wMUNTYuNzQ2phcCHDM0N"
"S4wOTEyoswHDoAQCDA5M6rOGRA5MS42N6ZPGyQyNzUuNzExMTIK";
String snappy_content;
Poco::MemoryInputStream istr(snappy_base64_content.data(), snappy_base64_content.size());
Poco::Base64Decoder decoder(istr);
Poco::StreamCopier::copyToString(decoder, snappy_content);
auto file_writer = std::make_unique<WriteBufferFromFile>("./test.snappy");
file_writer->write(snappy_content.c_str(), snappy_content.size());
file_writer->close();
std::unique_ptr<ReadBuffer> in = std::make_unique<ReadBufferFromFile>("./test.snappy", 128);
HadoopSnappyReadBuffer read_buffer(std::move(in));
String output;
WriteBufferFromString out(output);
copyData(read_buffer, out);
UInt128 hashcode = sipHash128(output.c_str(), output.size());
String hashcode_str = getHexUIntLowercase(hashcode);
ASSERT_EQ(hashcode_str, "593afe14f61866915cc00b8c7bd86046");
}

View File

@ -207,7 +207,6 @@ public:
using ShardsInfo = std::vector<ShardInfo>;
String getHashOfAddresses() const { return hash_of_addresses; }
const ShardsInfo & getShardsInfo() const { return shards_info; }
const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
@ -263,7 +262,6 @@ private:
/// Inter-server secret
String secret;
String hash_of_addresses;
/// Description of the cluster shards.
ShardsInfo shards_info;
/// Any remote shard.

View File

@ -116,7 +116,7 @@ void executeQuery(
const Settings & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::vector<QueryPlanPtr> plans;

View File

@ -1918,20 +1918,16 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
else if (interpreter_subquery)
{
/// Subquery.
/// If we need less number of columns that subquery have - update the interpreter.
if (required_columns.size() < source_header.columns())
{
ASTPtr subquery = extractTableExpression(query, 0);
if (!subquery)
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
ASTPtr subquery = extractTableExpression(query, 0);
if (!subquery)
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
if (query_analyzer->hasAggregation())
interpreter_subquery->ignoreWithTotals();
}
if (query_analyzer->hasAggregation())
interpreter_subquery->ignoreWithTotals();
interpreter_subquery->buildQueryPlan(query_plan);
query_plan.addInterpreterContext(context);

View File

@ -208,8 +208,10 @@ Block InterpreterSelectWithUnionQuery::getCurrentChildResultHeader(const ASTPtr
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return InterpreterSelectWithUnionQuery(ast_ptr_, context, options.copy().analyze().noModify(), required_result_column_names)
.getSampleBlock();
else
else if (ast_ptr_->as<ASTSelectQuery>())
return InterpreterSelectQuery(ast_ptr_, context, options.copy().analyze().noModify()).getSampleBlock();
else
return InterpreterSelectIntersectExceptQuery(ast_ptr_, context, options.copy().analyze().noModify()).getSampleBlock();
}
std::unique_ptr<IInterpreterUnionOrSelectQuery>

View File

@ -183,7 +183,9 @@ std::unique_ptr<InterpreterSelectWithUnionQuery> JoinedTables::makeLeftTableSubq
{
if (!isLeftTableSubquery())
return {};
return std::make_unique<InterpreterSelectWithUnionQuery>(left_table_expression, context, select_options);
/// Only build dry_run interpreter during analysis. We will reconstruct the subquery interpreter during plan building.
return std::make_unique<InterpreterSelectWithUnionQuery>(left_table_expression, context, select_options.copy().analyze());
}
StoragePtr JoinedTables::getLeftTableStorage()

View File

@ -445,7 +445,7 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<K
* 1: the intersection of the set and the range is non-empty
* 2: the range contains elements not in the set
*/
BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const
BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types, bool single_point) const
{
size_t tuple_size = indexes_mapping.size();
@ -468,7 +468,8 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges,
std::optional<Range> new_range = KeyCondition::applyMonotonicFunctionsChainToRange(
key_ranges[indexes_mapping[i].key_index],
indexes_mapping[i].functions,
data_types[indexes_mapping[i].key_index]);
data_types[indexes_mapping[i].key_index],
single_point);
if (!new_range)
return {true, true};

View File

@ -214,7 +214,7 @@ public:
bool hasMonotonicFunctionsChain() const;
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const;
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types, bool single_point = false) const;
private:
// If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element.

View File

@ -442,9 +442,9 @@ namespace
pattern_list_args->children = {
std::make_shared<ASTLiteral>("^["),
to_remove,
std::make_shared<ASTLiteral>("]*|["),
std::make_shared<ASTLiteral>("]+|["),
to_remove,
std::make_shared<ASTLiteral>("]*$")
std::make_shared<ASTLiteral>("]+$")
};
func_name = "replaceRegexpAll";
}
@ -455,7 +455,7 @@ namespace
pattern_list_args->children = {
std::make_shared<ASTLiteral>("^["),
to_remove,
std::make_shared<ASTLiteral>("]*")
std::make_shared<ASTLiteral>("]+")
};
}
else
@ -464,7 +464,7 @@ namespace
pattern_list_args->children = {
std::make_shared<ASTLiteral>("["),
to_remove,
std::make_shared<ASTLiteral>("]*$")
std::make_shared<ASTLiteral>("]+$")
};
}
func_name = "replaceRegexpOne";

View File

@ -210,7 +210,7 @@ static Block adaptBlockStructure(const Block & block, const Block & header)
return res;
}
void RemoteQueryExecutor::sendQuery()
void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind)
{
if (sent_query)
return;
@ -237,13 +237,7 @@ void RemoteQueryExecutor::sendQuery()
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context->getClientInfo();
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
/// Set initial_query_id to query_id for the clickhouse-benchmark.
///
/// (since first query of clickhouse-benchmark will be issued as SECONDARY_QUERY,
/// due to it executes queries via RemoteBlockInputStream)
if (modified_client_info.initial_query_id.empty())
modified_client_info.initial_query_id = query_id;
modified_client_info.query_kind = query_kind;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;

View File

@ -83,7 +83,13 @@ public:
~RemoteQueryExecutor();
/// Create connection and send query, external tables and scalars.
void sendQuery();
///
/// @param query_kind - kind of query, usually it is SECONDARY_QUERY,
/// since this is the queries between servers
/// (for which this code was written in general).
/// But clickhouse-benchmark uses the same code,
/// and it should pass INITIAL_QUERY.
void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY);
/// Query is resent to a replica, the query itself can be modified.
std::atomic<bool> resent_query { false };

View File

@ -27,7 +27,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller) : file_cache_controller(std::move(cache_controller))
LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller)
: file_cache_controller(std::move(cache_controller)), original_readbuffer(nullptr), thread_pool(nullptr)
{
file_buffer = file_cache_controller->value().allocFile();
if (!file_buffer)
@ -35,18 +36,43 @@ LocalFileHolder::LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_cont
ErrorCodes::LOGICAL_ERROR, "Create file readbuffer failed. {}", file_cache_controller->value().getLocalPath().string());
}
LocalFileHolder::LocalFileHolder(
RemoteFileCacheType::MappedHolderPtr cache_controller,
std::unique_ptr<ReadBuffer> original_readbuffer_,
BackgroundSchedulePool * thread_pool_)
: file_cache_controller(std::move(cache_controller))
, file_buffer(nullptr)
, original_readbuffer(std::move(original_readbuffer_))
, thread_pool(thread_pool_)
{
}
LocalFileHolder::~LocalFileHolder()
{
if (original_readbuffer)
{
dynamic_cast<SeekableReadBuffer *>(original_readbuffer.get())->seek(0, SEEK_SET);
file_cache_controller->value().startBackgroundDownload(std::move(original_readbuffer), *thread_pool);
}
}
RemoteReadBuffer::RemoteReadBuffer(size_t buff_size) : BufferWithOwnMemory<SeekableReadBufferWithSize>(buff_size)
{
}
std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(
ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer, size_t buff_size)
ContextPtr context,
IRemoteFileMetadataPtr remote_file_metadata,
std::unique_ptr<ReadBuffer> read_buffer,
size_t buff_size,
bool is_random_accessed)
{
auto remote_path = remote_file_metadata->remote_path;
auto remote_read_buffer = std::make_unique<RemoteReadBuffer>(buff_size);
std::tie(remote_read_buffer->local_file_holder, read_buffer)
= ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer);
= ExternalDataSourceCache::instance().createReader(context, remote_file_metadata, read_buffer, is_random_accessed);
if (remote_read_buffer->local_file_holder == nullptr)
return read_buffer;
remote_read_buffer->remote_file_size = remote_file_metadata->file_size;
@ -55,6 +81,19 @@ std::unique_ptr<ReadBuffer> RemoteReadBuffer::create(
bool RemoteReadBuffer::nextImpl()
{
if (local_file_holder->original_readbuffer)
{
auto status = local_file_holder->original_readbuffer->next();
if (status)
{
BufferBase::set(
local_file_holder->original_readbuffer->buffer().begin(),
local_file_holder->original_readbuffer->buffer().size(),
local_file_holder->original_readbuffer->offset());
}
return status;
}
auto start_offset = local_file_holder->file_buffer->getPosition();
auto end_offset = start_offset + local_file_holder->file_buffer->internalBuffer().size();
local_file_holder->file_cache_controller->value().waitMoreData(start_offset, end_offset);
@ -73,6 +112,16 @@ bool RemoteReadBuffer::nextImpl()
off_t RemoteReadBuffer::seek(off_t offset, int whence)
{
if (local_file_holder->original_readbuffer)
{
auto ret = dynamic_cast<SeekableReadBuffer *>(local_file_holder->original_readbuffer.get())->seek(offset, whence);
BufferBase::set(
local_file_holder->original_readbuffer->buffer().begin(),
local_file_holder->original_readbuffer->buffer().size(),
local_file_holder->original_readbuffer->offset());
return ret;
}
if (!local_file_holder->file_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot call seek() in this buffer. It's a bug!");
/*
@ -88,6 +137,10 @@ off_t RemoteReadBuffer::seek(off_t offset, int whence)
off_t RemoteReadBuffer::getPosition()
{
if (local_file_holder->original_readbuffer)
{
return dynamic_cast<SeekableReadBuffer *>(local_file_holder->original_readbuffer.get())->getPosition();
}
return local_file_holder->file_buffer->getPosition();
}
@ -164,7 +217,7 @@ String ExternalDataSourceCache::calculateLocalPath(IRemoteFileMetadataPtr metada
}
std::pair<std::unique_ptr<LocalFileHolder>, std::unique_ptr<ReadBuffer>> ExternalDataSourceCache::createReader(
ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer)
ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer, bool is_random_accessed)
{
// If something is wrong on startup, rollback to read from the original ReadBuffer.
if (!isInitialized())
@ -180,6 +233,11 @@ std::pair<std::unique_ptr<LocalFileHolder>, std::unique_ptr<ReadBuffer>> Externa
auto cache = lru_caches->get(local_path);
if (cache)
{
if (!cache->value().isEnable())
{
return {nullptr, std::move(read_buffer)};
}
// The remote file has been updated, need to redownload.
if (!cache->value().isValid() || cache->value().isModified(remote_file_metadata))
{
@ -216,6 +274,17 @@ std::pair<std::unique_ptr<LocalFileHolder>, std::unique_ptr<ReadBuffer>> Externa
lru_caches->weight());
return {nullptr, std::move(read_buffer)};
}
/*
If read_buffer is seekable, use read_buffer directly inside LocalFileHolder. And once LocalFileHolder is released,
start the download process in background.
The cache is marked disable until the download process finish.
For reading parquet files from hdfs, with this optimization, the speedup can reach 3x.
*/
if (dynamic_cast<SeekableReadBuffer *>(read_buffer.get()) && is_random_accessed)
{
new_cache->value().disable();
return {std::make_unique<LocalFileHolder>(std::move(new_cache), std::move(read_buffer), &context->getSchedulePool()), nullptr};
}
new_cache->value().startBackgroundDownload(std::move(read_buffer), context->getSchedulePool());
return {std::make_unique<LocalFileHolder>(std::move(new_cache)), nullptr};
}

View File

@ -34,10 +34,13 @@ class LocalFileHolder
{
public:
explicit LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller);
~LocalFileHolder() = default;
explicit LocalFileHolder(RemoteFileCacheType::MappedHolderPtr cache_controller, std::unique_ptr<ReadBuffer> original_readbuffer_, BackgroundSchedulePool * thread_pool_);
~LocalFileHolder();
RemoteFileCacheType::MappedHolderPtr file_cache_controller;
std::unique_ptr<ReadBufferFromFileBase> file_buffer;
std::unique_ptr<ReadBuffer> original_readbuffer;
BackgroundSchedulePool * thread_pool;
};
class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBufferWithSize>
@ -45,7 +48,7 @@ class RemoteReadBuffer : public BufferWithOwnMemory<SeekableReadBufferWithSize>
public:
explicit RemoteReadBuffer(size_t buff_size);
~RemoteReadBuffer() override = default;
static std::unique_ptr<ReadBuffer> create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer, size_t buff_size);
static std::unique_ptr<ReadBuffer> create(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> read_buffer, size_t buff_size, bool is_random_accessed = false);
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
@ -70,7 +73,8 @@ public:
inline bool isInitialized() const { return initialized; }
std::pair<std::unique_ptr<LocalFileHolder>, std::unique_ptr<ReadBuffer>>
createReader(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer);
createReader(ContextPtr context, IRemoteFileMetadataPtr remote_file_metadata, std::unique_ptr<ReadBuffer> & read_buffer, bool is_random_accessed);
void updateTotalSize(size_t size) { total_size += size; }

View File

@ -169,6 +169,7 @@ void RemoteCacheController::backgroundDownload(ReadBufferPtr remote_read_buffer)
file_status = DOWNLOADED;
flush(true);
data_file_writer.reset();
is_enable = true;
lock.unlock();
more_data_signal.notify_all();
ExternalDataSourceCache::instance().updateTotalSize(file_metadata_ptr->file_size);

View File

@ -63,6 +63,22 @@ public:
std::lock_guard lock(mutex);
return valid;
}
inline bool isEnable()
{
std::lock_guard lock(mutex);
return is_enable;
}
inline void disable()
{
std::lock_guard lock(mutex);
is_enable = false;
}
inline void enable()
{
std::lock_guard lock(mutex);
is_enable = true;
}
IRemoteFileMetadataPtr getFileMetadata() { return file_metadata_ptr; }
inline size_t getFileSize() const { return file_metadata_ptr->file_size; }
@ -83,6 +99,17 @@ private:
IRemoteFileMetadataPtr file_metadata_ptr;
std::filesystem::path local_path;
/**
* is_enable = true, only when the remotereadbuffer has been cached at local disk.
*
* The first time to access a remotebuffer which is not cached at local disk, we use the original remotebuffer directly and mark RemoteCacheController::is_enable = false.
* When the first time access is finished, LocalFileHolder will start a background download process by reusing the same remotebuffer object. After the download process
* finish, is_enable is set true.
*
* So when is_enable=false, if there is anther thread trying to access the same remote file, it would fail to use the local file buffer and use the original remotebuffer
* instead. Avoid multi threads trying to save the same file in to disk at the same time.
*/
bool is_enable = true;
bool valid = true;
size_t local_cache_bytes_read_before_flush;
size_t current_offset;

View File

@ -126,7 +126,7 @@ DistributedSink::DistributedSink(
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
const auto & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
context->getClientInfo().distributed_depth += 1;
random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;

View File

@ -34,7 +34,7 @@ IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db",
"database", "table", "schema", "replica",
"update_field", "update_tag", "invalidate_query", "query",
"update_field", "update_lag", "invalidate_query", "query",
"where", "name", "secure", "uri", "collection"};

View File

@ -16,7 +16,7 @@ struct ExternalDataSourceConfiguration
{
String host;
UInt16 port = 0;
String username;
String username = "default";
String password;
String database;
String table;

View File

@ -116,13 +116,12 @@ public:
, compression_method(compression_method_)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
, to_read_block(sample_block)
, columns_description(getColumnsDescription(sample_block, source_info))
, text_input_field_names(text_input_field_names_)
, format_settings(getFormatSettings(getContext()))
{
/// Initialize to_read_block, which is used to read data from HDFS.
to_read_block = sample_block;
/// Initialize to_read_block, which is used to read data from HDFS.
for (const auto & name_type : source_info->partition_name_types)
{
to_read_block.erase(name_type.name);
@ -171,9 +170,13 @@ public:
size_t buff_size = raw_read_buf->internalBuffer().size();
if (buff_size == 0)
buff_size = DBMS_DEFAULT_BUFFER_SIZE;
remote_read_buf = RemoteReadBuffer::create(getContext(),
std::make_shared<StorageHiveMetadata>("Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()),
std::move(raw_read_buf), buff_size);
remote_read_buf = RemoteReadBuffer::create(
getContext(),
std::make_shared<StorageHiveMetadata>(
"Hive", getNameNodeCluster(hdfs_namenode_url), uri_with_path, curr_file->getSize(), curr_file->getLastModTs()),
std::move(raw_read_buf),
buff_size,
format == "Parquet" || format == "ORC");
}
else
remote_read_buf = std::move(raw_read_buf);
@ -207,11 +210,17 @@ public:
/// Enrich with partition columns.
auto types = source_info->partition_name_types.getTypes();
auto names = source_info->partition_name_types.getNames();
auto fields = source_info->hive_files[current_idx]->getPartitionValues();
for (size_t i = 0; i < types.size(); ++i)
{
auto column = types[i]->createColumnConst(num_rows, source_info->hive_files[current_idx]->getPartitionValues()[i]);
auto previous_idx = sample_block.getPositionByName(source_info->partition_name_types.getNames()[i]);
columns.insert(columns.begin() + previous_idx, column->convertToFullColumnIfConst());
// Only add the required partition columns. partition columns are not read from readbuffer
// the column must be in sample_block, otherwise sample_block.getPositionByName(names[i]) will throw an exception
if (!sample_block.has(names[i]))
continue;
auto column = types[i]->createColumnConst(num_rows, fields[i]);
auto previous_idx = sample_block.getPositionByName(names[i]);
columns.insert(columns.begin() + previous_idx, column);
}
/// Enrich with virtual columns.
@ -551,7 +560,34 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded(
}
return hive_file;
}
bool StorageHive::isColumnOriented() const
{
return format_name == "Parquet" || format_name == "ORC";
}
void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const
{
if (!isColumnOriented())
sample_block = header_block;
UInt32 erased_columns = 0;
for (const auto & column : partition_columns)
{
if (sample_block.has(column))
erased_columns++;
}
if (erased_columns == sample_block.columns())
{
for (size_t i = 0; i < header_block.columns(); ++i)
{
const auto & col = header_block.getByPosition(i);
if (!partition_columns.count(col.name))
{
sample_block.insert(col);
break;
}
}
}
}
Pipe StorageHive::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -617,14 +653,20 @@ Pipe StorageHive::read(
sources_info->table_name = hive_table;
sources_info->hive_metastore_client = hive_metastore_client;
sources_info->partition_name_types = partition_name_types;
const auto & header_block = metadata_snapshot->getSampleBlock();
Block sample_block;
for (const auto & column : column_names)
{
sample_block.insert(header_block.getByName(column));
if (column == "_path")
sources_info->need_path_column = true;
if (column == "_file")
sources_info->need_file_column = true;
}
getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()});
if (num_streams > sources_info->hive_files.size())
num_streams = sources_info->hive_files.size();
@ -636,7 +678,7 @@ Pipe StorageHive::read(
hdfs_namenode_url,
format_name,
compression_method,
metadata_snapshot->getSampleBlock(),
sample_block,
context_,
max_block_size,
text_input_field_names));

View File

@ -53,6 +53,8 @@ public:
NamesAndTypesList getVirtuals() const override;
bool isColumnOriented() const override;
protected:
friend class StorageHiveSource;
StorageHive(
@ -88,6 +90,8 @@ private:
HiveFilePtr
createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_);
void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const;
String hive_metastore_url;
/// Hive database and table

View File

@ -448,7 +448,7 @@ KeyCondition::KeyCondition(
{
for (size_t i = 0, size = key_column_names.size(); i < size; ++i)
{
std::string name = key_column_names[i];
const auto & name = key_column_names[i];
if (!key_columns.count(name))
key_columns[name] = i;
}
@ -1999,7 +1999,7 @@ BoolMask KeyCondition::checkInHyperrectangle(
if (!element.set_index)
throw Exception("Set for IN is not created yet", ErrorCodes::LOGICAL_ERROR);
rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types));
rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types, single_point));
if (element.function == RPNElement::FUNCTION_NOT_IN_SET)
rpn_stack.back() = !rpn_stack.back();
}

View File

@ -126,13 +126,9 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (ctx->disk->exists(local_new_part_tmp_path))
throw Exception("Directory " + fullPath(ctx->disk, local_new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
{
std::lock_guard lock(global_ctx->mutator->tmp_parts_lock);
global_ctx->mutator->tmp_parts.emplace(local_tmp_part_basename);
}
global_ctx->data->temporary_parts.add(local_tmp_part_basename);
SCOPE_EXIT(
std::lock_guard lock(global_ctx->mutator->tmp_parts_lock);
global_ctx->mutator->tmp_parts.erase(local_tmp_part_basename);
global_ctx->data->temporary_parts.remove(local_tmp_part_basename);
);
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();

View File

@ -1488,7 +1488,7 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa
}
size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds)
size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds)
{
/// If the method is already called from another thread, then we don't need to do anything.
std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock);
@ -1520,9 +1520,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta
{
if (disk->isDirectory(it->path()) && isOldPartDirectory(disk, it->path(), deadline))
{
if (merger_mutator.hasTemporaryPart(basename))
if (temporary_parts.contains(basename))
{
LOG_WARNING(log, "{} is an active destination for one of merge/mutation (consider increasing temporary_directories_lifetime setting)", full_path);
LOG_WARNING(log, "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
continue;
}
else

View File

@ -3,30 +3,31 @@
#include <Common/SimpleIncrement.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/DataDestinationType.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/TemporaryParts.h>
#include <Storages/IndicesDescription.h>
#include <Storages/DataDestinationType.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h>
#include <boost/multi_index_container.hpp>
@ -585,7 +586,7 @@ public:
/// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path.
size_t clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds);
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds);
size_t clearEmptyParts();
@ -929,7 +930,6 @@ public:
mutable std::mutex currently_submerging_emerging_mutex;
protected:
friend class IMergeTreeDataPart;
friend class MergeTreeDataMergerMutator;
friend struct ReplicatedMergeTreeTableMetadata;
@ -1226,6 +1226,8 @@ private:
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
TemporaryParts temporary_parts;
};
/// RAII struct to record big parts that are submerging or emerging.

View File

@ -797,10 +797,4 @@ ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadat
}
bool MergeTreeDataMergerMutator::hasTemporaryPart(const std::string & basename) const
{
std::lock_guard lock(tmp_parts_lock);
return tmp_parts.contains(basename);
}
}

View File

@ -200,26 +200,6 @@ private:
ITTLMergeSelector::PartitionIdToTTLs next_recompress_ttl_merge_times_by_partition;
/// Performing TTL merges independently for each partition guarantees that
/// there is only a limited number of TTL merges and no partition stores data, that is too stale
public:
/// Returns true if passed part name is active.
/// (is the destination for one of active mutation/merge).
///
/// NOTE: that it accept basename (i.e. dirname), not the path,
/// since later requires canonical form.
bool hasTemporaryPart(const std::string & basename) const;
private:
/// Set of active temporary paths that is used as the destination.
/// List of such paths is required to avoid trying to remove them during cleanup.
///
/// NOTE: It is pretty short, so use STL is fine.
std::unordered_set<std::string> tmp_parts;
/// Lock for "tmp_parts".
///
/// NOTE: mutable is required to mark hasTemporaryPath() const
mutable std::mutex tmp_parts_lock;
};

View File

@ -64,7 +64,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
/// Both use relative_data_path which changes during rename, so we
/// do it under share lock
storage.clearOldWriteAheadLogs();
storage.clearOldTemporaryDirectories(storage.merger_mutator, storage.getSettings()->temporary_directories_lifetime.totalSeconds());
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
}
/// This is loose condition: no problem if we actually had lost leadership at this moment

View File

@ -0,0 +1,24 @@
#include <Storages/MergeTree/TemporaryParts.h>
namespace DB
{
bool TemporaryParts::contains(const std::string & basename) const
{
std::lock_guard lock(mutex);
return parts.contains(basename);
}
void TemporaryParts::add(std::string basename)
{
std::lock_guard lock(mutex);
parts.emplace(std::move(basename));
}
void TemporaryParts::remove(const std::string & basename)
{
std::lock_guard lock(mutex);
parts.erase(basename);
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <mutex>
#include <string>
#include <unordered_set>
namespace DB
{
/// Manages set of active temporary paths that should not be cleaned by background thread.
class TemporaryParts : private boost::noncopyable
{
private:
/// To add const qualifier for contains()
mutable std::mutex mutex;
/// NOTE: It is pretty short, so use STL is fine.
std::unordered_set<std::string> parts;
public:
/// Returns true if passed part name is active.
/// (is the destination for one of active mutation/merge).
///
/// NOTE: that it accept basename (i.e. dirname), not the path,
/// since later requires canonical form.
bool contains(const std::string & basename) const;
void add(std::string basename);
void remove(const std::string & basename);
};
}

View File

@ -1000,7 +1000,8 @@ void StorageBuffer::reschedule()
size_t min = std::max<ssize_t>(min_thresholds.time - time_passed, 1);
size_t max = std::max<ssize_t>(max_thresholds.time - time_passed, 1);
flush_handle->scheduleAfter(std::min(min, max) * 1000);
size_t flush = std::max<ssize_t>(flush_thresholds.time - time_passed, 1);
flush_handle->scheduleAfter(std::min({min, max, flush}) * 1000);
}
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const

View File

@ -56,6 +56,8 @@
#include <Interpreters/getClusterName.h>
#include <Interpreters/getTableExpressions.h>
#include <Functions/IFunction.h>
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
@ -118,6 +120,7 @@ namespace ErrorCodes
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ActionLocks
@ -705,6 +708,9 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::shared_ptr<StorageDistributed> storage_src;
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
@ -719,28 +725,60 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
/// Unwrap view() function.
if (storage_src->remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query;
new_query->select = select_with_union_query;
}
}
}
}
}
if (!storage_src || storage_src->getClusterName() != getClusterName())
const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{};
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
/// Compare addresses instead of cluster name, to handle remote()/cluster().
/// (since for remote()/cluster() the getClusterName() is empty string)
if (src_addresses != dst_addresses)
{
/// The warning should be produced only for root queries,
/// since in case of parallel_distributed_insert_select=1,
/// it will produce warning for the rewritten insert,
/// since destination table is still Distributed there.
if (local_context->getClientInfo().distributed_depth == 0)
{
LOG_WARNING(log,
"Parallel distributed INSERT SELECT is not possible "
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
storage_src ? storage_src->getClusterName() : "<not a Distributed table>",
src_addresses.size(),
getClusterName(),
dst_addresses.size());
}
return nullptr;
}
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
const auto & cluster = getCluster();
@ -757,12 +795,15 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
new_query_str = buf.str();
}
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
for (size_t shard_index : collections::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_index];
if (shard_info.isLocal())
{
InterpreterInsertQuery interpreter(new_query, local_context);
InterpreterInsertQuery interpreter(new_query, query_context);
pipelines.emplace_back(std::make_unique<QueryPipelineBuilder>());
pipelines.back()->init(interpreter.execute().pipeline);
}
@ -776,7 +817,7 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
/// INSERT SELECT query returns empty block
auto remote_query_executor
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context);
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, query_context);
pipelines.emplace_back(std::make_unique<QueryPipelineBuilder>());
pipelines.back()->init(Pipe(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote)));
pipelines.back()->setSinks([](const Block & header, QueryPipelineBuilder::StreamType) -> ProcessorPtr

View File

@ -114,8 +114,6 @@ public:
/// Used by InterpreterInsertQuery
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
/// Returns empty string if tables is used by TableFunctionRemote
std::string getClusterName() const { return cluster_name; }
ClusterPtr getCluster() const;
/// Used by InterpreterSystemQuery
@ -201,6 +199,7 @@ private:
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
std::string getClusterName() const { return cluster_name.empty() ? "<remote>" : cluster_name; }
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }

View File

@ -125,7 +125,7 @@ void StorageMergeTree::startup()
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
clearOldTemporaryDirectories(merger_mutator, 0);
clearOldTemporaryDirectories(0);
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup_parts.restart();
@ -1163,7 +1163,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
assignee.scheduleCommonTask(ExecutableLambdaAdapter::create(
[this, share_lock] ()
{
return clearOldTemporaryDirectories(merger_mutator, getSettings()->temporary_directories_lifetime.totalSeconds());
return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
scheduled = true;
}

View File

@ -451,7 +451,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
/// don't allow to reinitialize them, delete each of them immediately.
clearOldTemporaryDirectories(merger_mutator, 0);
clearOldTemporaryDirectories(0);
clearOldWriteAheadLogs();
}

View File

@ -4,14 +4,18 @@ if (TARGET ch_contrib::hivemetastore)
add_headers_and_sources(clickhouse_table_functions Hive)
endif ()
list(REMOVE_ITEM clickhouse_table_functions_sources ITableFunction.cpp TableFunctionFactory.cpp)
list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFunctionFactory.h)
list(REMOVE_ITEM clickhouse_table_functions_sources
ITableFunction.cpp
TableFunctionView.cpp
TableFunctionFactory.cpp)
list(REMOVE_ITEM clickhouse_table_functions_headers
ITableFunction.h
TableFunctionView.h
TableFunctionFactory.h)
add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
if (TARGET ch_contrib::hivemetastore)
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms ch_contrib::hivemetastore ch_contrib::hdfs)
else ()
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs)
endif ()

View File

@ -15,6 +15,12 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
const ASTSelectWithUnionQuery & TableFunctionView::getSelectQuery() const
{
return *create.select;
}
void TableFunctionView::parseArguments(const ASTPtr & ast_function, ContextPtr /*context*/)
{
const auto * function = ast_function->as<ASTFunction>();

View File

@ -16,6 +16,9 @@ class TableFunctionView : public ITableFunction
public:
static constexpr auto name = "view";
std::string getName() const override { return name; }
const ASTSelectWithUnionQuery & getSelectQuery() const;
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "View"; }

View File

@ -231,7 +231,6 @@ CI_CONFIG = {
},
"Stateful tests (aarch64, actions)": {
"required_build": "package_aarch64",
"force_tests": True,
},
"Stateful tests (release, DatabaseOrdinary, actions)": {
"required_build": "package_release",
@ -259,7 +258,6 @@ CI_CONFIG = {
},
"Stateless tests (aarch64, actions)": {
"required_build": "package_aarch64",
"force_tests": True,
},
"Stateless tests (release, wide parts enabled, actions)": {
"required_build": "package_release",

View File

@ -27,37 +27,52 @@ def started_cluster():
def test_create_parquet_table(started_cluster):
logging.info('Start testing creating hive table ...')
node = started_cluster.instances['h0_0_0']
node.query("set input_format_parquet_allow_missing_columns = true")
result = node.query("""
DROP TABLE IF EXISTS default.demo_parquet;
CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day);
test_passed = False
for i in range(10):
node.query("set input_format_parquet_allow_missing_columns = true")
result = node.query("""
DROP TABLE IF EXISTS default.demo_parquet;
CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day)
""")
logging.info("create result {}".format(result))
time.sleep(120)
assert result.strip() == ''
logging.info("create result {}".format(result))
if result.strip() == '':
test_passed = True
break
time.sleep(60)
assert test_passed
def test_create_parquet_table_1(started_cluster):
logging.info('Start testing creating hive table ...')
node = started_cluster.instances['h0_0_0']
node.query("set input_format_parquet_allow_missing_columns = true")
result = node.query("""
DROP TABLE IF EXISTS default.demo_parquet_parts;
CREATE TABLE default.demo_parquet_parts (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String), `hour` String) ENGINE = Hive('thrift://hivetest:9083', 'test', 'parquet_demo') PARTITION BY(day, hour);
for i in range(10):
node.query("set input_format_parquet_allow_missing_columns = true")
result = node.query("""
DROP TABLE IF EXISTS default.demo_parquet_parts;
CREATE TABLE default.demo_parquet_parts (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String), `hour` String) ENGINE = Hive('thrift://hivetest:9083', 'test', 'parquet_demo') PARTITION BY(day, hour);
""")
logging.info("create result {}".format(result))
time.sleep(120)
assert result.strip() == ''
logging.info("create result {}".format(result))
if result.strip() == '':
test_passed = True
break
time.sleep(60)
assert test_passed
def test_create_orc_table(started_cluster):
logging.info('Start testing creating hive table ...')
node = started_cluster.instances['h0_0_0']
result = node.query("""
test_passed = False
for i in range(10):
result = node.query("""
DROP TABLE IF EXISTS default.demo_orc;
CREATE TABLE default.demo_orc (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo_orc') PARTITION BY(day)
""")
logging.info("create result {}".format(result))
logging.info("create result {}".format(result))
if result.strip() == '':
test_passed = True
break
time.sleep(60)
assert result.strip() == ''
assert test_passed
def test_create_text_table(started_cluster):
logging.info('Start testing creating hive table ...')
@ -146,25 +161,20 @@ def test_parquet_groupby_by_hive_function(started_cluster):
def test_cache_read_bytes(started_cluster):
node = started_cluster.instances['h0_0_0']
node.query("set input_format_parquet_allow_missing_columns = true")
result = node.query("""
DROP TABLE IF EXISTS default.demo_parquet;
CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day)
CREATE TABLE IF NOT EXISTS default.demo_parquet_1 (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day)
""")
result = node.query("""
SELECT day, count(*) FROM default.demo_parquet group by day order by day
test_passed = False
for i in range(10):
result = node.query("""
SELECT day, count(*) FROM default.demo_parquet_1 group by day order by day settings input_format_parquet_allow_missing_columns = true
""")
result = node.query("""
SELECT day, count(*) FROM default.demo_parquet group by day order by day
""")
expected_result = """2021-11-01 1
2021-11-05 2
2021-11-11 1
2021-11-16 2
"""
time.sleep(120)
assert result == expected_result
result = node.query("select sum(ProfileEvent_ExternalDataSourceLocalCacheReadBytes) from system.metric_log where ProfileEvent_ExternalDataSourceLocalCacheReadBytes > 0")
logging.info("Read bytes from cache:{}".format(result))
assert result.strip() != '0'
node.query("system flush logs")
result = node.query("select sum(ProfileEvent_ExternalDataSourceLocalCacheReadBytes) from system.metric_log where ProfileEvent_ExternalDataSourceLocalCacheReadBytes > 0")
if result.strip() == '0':
logging.info("ProfileEvent_ExternalDataSourceLocalCacheReadBytes == 0")
time.sleep(10)
continue
test_passed = True
break
assert test_passed

View File

@ -42,6 +42,7 @@ not compatible
= compression
1000
1000
1000
= other
0
1000

View File

@ -50,8 +50,12 @@ echo '=' compression
cat "$DATA_DIR"/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
cat "$DATA_DIR"/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
#snappy is optional
#cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
# snappy is optional
if [ "$( ${CLICKHOUSE_LOCAL} -q "SELECT value FROM system.build_options where name = 'USE_SNAPPY' LIMIT 1")" == "1" ]; then
cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
else
echo 1000
fi
echo '=' other
#no data

View File

@ -1,6 +1,7 @@
#!/usr/bin/env bash
# Tags: no-asan, no-msan, no-fasttest
# Tags: no-asan, no-msan, no-fasttest, no-cpu-aarch64
# Tag no-msan: can't pass because odbc libraries are not instrumented
# Tag no-cpu-aarch64: clickhouse-odbc is not setup for arm
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -2,8 +2,8 @@
set parallel_view_processing=1;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1';
8
select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1';
1
select count() from testX;
10
select count() from testXA;
@ -15,8 +15,8 @@ select count() from testXC;
set parallel_view_processing=0;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0';
5
select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0';
1
select count() from testX;
20
select count() from testXA;

View File

@ -15,7 +15,7 @@ create materialized view testXC engine=MergeTree order by tuple() as select slee
set parallel_view_processing=1;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1';
select length(thread_ids) >= 8 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '1';
select count() from testX;
select count() from testXA;
@ -25,7 +25,7 @@ select count() from testXC;
set parallel_view_processing=0;
insert into testX select number from numbers(10) settings log_queries=1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system flush logs;
select length(thread_ids) from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0';
select length(thread_ids) >= 5 from system.query_log where current_database = currentDatabase() and type != 'QueryStart' and query like '%insert into testX %' and Settings['parallel_view_processing'] = '0';
select count() from testX;
select count() from testXA;

View File

@ -12,3 +12,5 @@ SELECT '1,,' == replaceRegexpOne('1,,', '^[,]*|[,]*$', '') x;
SELECT '5935,5998,6014' == trim(BOTH ', ' FROM '5935,5998,6014, ') x;
SELECT '5935,5998,6014' == replaceRegexpAll('5935,5998,6014, ', concat('^[', regexpQuoteMeta(', '), ']*|[', regexpQuoteMeta(', '), ']*$'), '') AS x;
SELECT trim(BOTH '"' FROM '2') == '2'

View File

@ -0,0 +1,27 @@
-- { echoOn }
truncate table dst_02224;
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH }
select * from dst_02224;
truncate table dst_02224;
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=1, max_distributed_depth=2;
select * from dst_02224;
1
1
truncate table dst_02224;
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02224;
1
1
truncate table dst_02224;
insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key)
select * from remote('127.{1,2}', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02224;
1
1

View File

@ -0,0 +1,34 @@
drop table if exists dst_02224;
drop table if exists src_02224;
create table dst_02224 (key Int) engine=Memory();
create table src_02224 (key Int) engine=Memory();
insert into src_02224 values (1);
-- { echoOn }
truncate table dst_02224;
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH }
select * from dst_02224;
truncate table dst_02224;
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=1, max_distributed_depth=2;
select * from dst_02224;
truncate table dst_02224;
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02224;
truncate table dst_02224;
insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key)
select * from remote('127.{1,2}', currentDatabase(), src_02224, key)
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02224;
-- { echoOff }
drop table src_02224;
drop table dst_02224;

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# NOTE: sh test is required since view() does not have current database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists dst_02225;
drop table if exists src_02225;
create table dst_02225 (key Int) engine=Memory();
create table src_02225 (key Int) engine=Memory();
insert into src_02225 values (1);
"
$CLICKHOUSE_CLIENT --param_database=$CLICKHOUSE_DATABASE -nm -q "
truncate table dst_02225;
insert into function remote('127.{1,2}', currentDatabase(), dst_02225, key)
select * from remote('127.{1,2}', view(select * from {database:Identifier}.src_02225), key)
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02225;
-- w/o sharding key
truncate table dst_02225;
insert into function remote('127.{1,2}', currentDatabase(), dst_02225, key)
select * from remote('127.{1,2}', view(select * from {database:Identifier}.src_02225))
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02225;
"
$CLICKHOUSE_CLIENT -nm -q "
drop table src_02225;
drop table dst_02225;
"

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists data_02226;
create table data_02226 (key Int) engine=MergeTree() order by key
as select * from numbers(1);
"
# Regression for:
#
# Logical error: 'Coordinator for parallel reading from replicas is not initialized'.
opts=(
--allow_experimental_parallel_reading_from_replicas 1
--max_parallel_replicas 3
--iterations 1
)
$CLICKHOUSE_BENCHMARK --query "select * from remote('127.1', $CLICKHOUSE_DATABASE, data_02226)" "${opts[@]}" >& /dev/null
ret=$?
$CLICKHOUSE_CLIENT -nm -q "
drop table data_02226;
"
exit $ret

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS lower_test;
CREATE TABLE lower_test (
a Int32,
b String
) ENGINE=MergeTree
PARTITION BY b
ORDER BY a;
INSERT INTO lower_test (a,b) VALUES (1,'A'),(2,'B'),(3,'C');
SELECT a FROM lower_test WHERE lower(b) IN ('a','b') order by a;
DROP TABLE lower_test;

View File

@ -15,7 +15,7 @@
<div class="row mb-3">
<div class="col">
<a href="/" title="Main page" class="float-left mr-3">
<img src="/images/logo.svg" alt="ClickHouse" />
<img src="/docs/images/logo.svg" alt="ClickHouse" />
</a>
<h1>Performance comparison of analytical DBMS</h1>
</div>

View File

@ -15,7 +15,7 @@
<div class="row mb-3">
<div class="col d-flex align-items-center">
<a href="/" title="Main page" class="float-left mr-3">
<img src="/images/logo.svg" alt="ClickHouse" />
<img src="/docs/images/logo.svg" alt="ClickHouse" />
</a>
<h1 class="h2 mb-0">{{ title }}</h1>
</div>

View File

@ -1 +0,0 @@
#main{padding-bottom:0;padding-top:0}#wrapper{max-width:1078px;padding:0}body>#wrapper>#main>#wrapper>#content,body>#wrapper>#main>#wrapper>#logo,body>#wrapper>#main>#wrapper>h1{display:none}body>#wrapper>#main>#wrapper>#board_title{margin-top:0}body>#wrapper>#main>#logo{margin-top:80px}body>#wrapper>#main>:last-child{margin-bottom:120px}

View File

@ -55,7 +55,7 @@
$('pre').each(function(_, element) {
$(element).prepend(
'<img src="/images/mkdocs/copy.svg" alt="Copy" title="Copy" class="code-copy btn float-right m-0 p-0" />'
'<img src="/docs/images/mkdocs/copy.svg" alt="Copy" title="Copy" class="code-copy btn float-right m-0 p-0" />'
);
});

View File

@ -1,27 +0,0 @@
#main {
padding-bottom: 0;
padding-top: 0;
}
#wrapper {
max-width: 1078px;
padding: 0;
}
body > #wrapper > #main > #wrapper > #logo,
body > #wrapper > #main > #wrapper > h1,
body > #wrapper > #main > #wrapper > #content {
display: none;
}
body > #wrapper > #main > #wrapper > #board_title {
margin-top: 0;
}
body > #wrapper > #main > #logo {
margin-top: 80px;
}
body > #wrapper > #main > :last-child {
margin-bottom: 120px;
}

View File

@ -1,4 +1,4 @@
<link href="/css/base.css?css_digest" media="all" rel="stylesheet" />
<link href="/docs/css/base.css?css_digest" media="all" rel="stylesheet" />
{% for src in extra_css %}
<link href="{{ src }}" media="all" rel="stylesheet" />

View File

@ -1,4 +1,4 @@
<script{% if not extra_js %} async{% endif %} type="text/javascript" src="/js/base.js?js_digest"></script>
<script{% if not extra_js %} async{% endif %} type="text/javascript" src="/docs/js/base.js?js_digest"></script>
{% for src in extra_js %}
<script type="text/javascript" src="{{ src }}"></script>

View File

@ -7,7 +7,7 @@
<title>{% if title %}{{ title }}{% else %}{{ _('ClickHouse - fast open-source OLAP DBMS') }}{% endif %}</title>
<link rel="shortcut icon" href="/favicon.ico" />
<link rel="apple-touch-icon" sizes="180x180" href="/images/logo-180x180.png" />
<link rel="apple-touch-icon" sizes="180x180" href="/docs/images/logo-180x180.png" />
<meta property="og:title" content="{% if title %}{{ title }}{% else %}{{ _('ClickHouse DBMS') }}{% endif %}"/>
<meta property="og:description" content="{{ description }}" />
@ -15,7 +15,7 @@
{% if page and page.meta.image %}
<meta property="og:image" content="{{ page.meta.image}}" />
{% else %}
<meta property="og:image" content="https://clickhouse.com/images/logo.png" />
<meta property="og:image" content="https://clickhouse.com/docs/images/logo.png" />
{% endif %}
<meta property="og:url" content="{{ url or 'https://clickhouse.com/' }}"/>
<link rel="canonical" href="{{ canonical_url or 'https://clickhouse.com/' }}" />

View File

@ -20,7 +20,7 @@
<div class="row pt-3 mb-3">
<div class="col">
<a href="/" class="text-decoration-none">
<amp-img class="d-inline-block mr-3" layout="fixed" width="40" height="36" src="/images/logo.svg" alt="ClickHouse logo" title="ClickHouse logo"></amp-img><amp-img class="invert-dark d-inline-block" layout="fixed" width="238" height="36" src="/images/clickhouse-black.svg" alt="ClickHouse" title="ClickHouse"></amp-img>
<amp-img class="d-inline-block mr-3" layout="fixed" width="40" height="36" src="/docs/images/logo.svg" alt="ClickHouse logo" title="ClickHouse logo"></amp-img><amp-img class="invert-dark d-inline-block" layout="fixed" width="238" height="36" src="/docs/images/clickhouse-black.svg" alt="ClickHouse" title="ClickHouse"></amp-img>
</a>
</div>
</div>

View File

@ -1,7 +1,7 @@
<nav id="top-nav" class="navbar py-1 navbar-dark navbar-expand-md bg-dark-alt text-white fixed-top">
<div class="container-fluid justify-content-between">
<a class="d-block navbar-brand mr-3" href="/">
<img id="docs-logo-icon" src="/images/logo.svg" alt="ClickHouse logo" title="ClickHouse logo"/>
<img id="docs-logo-icon" src="/docs/images/logo.svg" alt="ClickHouse logo" title="ClickHouse logo"/>
</a>
<div class="w-100 navbar-text text-left d-none d-md-block">
<h1 class="h3 m-0 p-0 d-inline">
@ -24,24 +24,24 @@
{% endif %}
{% if edit_url %}
<li id="edit-wrapper" class="nav-item">
<a id="edit-link" href="{{ edit_url }}" title="Edit this article" class="nav-link" rel="external nofollow noreferrer" target="_blank"><img src="/images/mkdocs/edit.svg" alt="Edit this article" height="60" /></a>
<a id="edit-link" href="{{ edit_url }}" title="Edit this article" class="nav-link" rel="external nofollow noreferrer" target="_blank"><img src="/docs/images/mkdocs/edit.svg" alt="Edit this article" height="60" /></a>
</li>
{% endif %}
<li class="nav-item">
<a class="nav-link" href="https://github.com/ClickHouse/ClickHouse" rel="external nofollow noreferrer">
<img src="/images/index/github.svg" alt="GitHub repository" title="Go to GitHub" height="72" />
<img src="/docs/images/index/github.svg" alt="GitHub repository" title="Go to GitHub" height="72" />
</a>
</li>
<li id="languages-wrapper" class="nav-item dropdown mr-3">
<div id="languages-dropdown">
<a class="nav-link dropdown-toggle d-inline-block mt-1 text-muted" href="#" id="lang-dropdown" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">
<img src="/images/flags/{{ language }}.svg" alt="{{ config.extra.languages[language] }}" title="{{ config.extra.languages[language] }}" width="60" class="d-inline-block mt-n1" />
<img src="/docs/images/flags/{{ language }}.svg" alt="{{ config.extra.languages[language] }}" title="{{ config.extra.languages[language] }}" width="60" class="d-inline-block mt-n1" />
</a>
<div class="dropdown-menu bg-dark" aria-labelledby="lang-dropdown">
{% for code, name in config.extra.languages.items() %}
<a class="dropdown-item{% if language == code %} disabled{% endif %}"
href="/docs/{{ code }}/{{ page.url }}">
<img src="/images/flags/{{ code }}.svg" alt="" title="" width="32" class="d-inline-block mr-2" />{{ name }}
<img src="/docs/images/flags/{{ code }}.svg" alt="" title="" width="32" class="d-inline-block mr-2" />{{ name }}
</a>
{% endfor %}
</div>
@ -54,7 +54,7 @@
<div class="input-group">
<div class="input-group-prepend w-100">
<span class="input-group-text bg-secondary-alt text-muted border-0 mr-n1" id="search-icon">
<img src="/images/mkdocs/search.svg" />
<img src="/docs/images/mkdocs/search.svg" />
</span>
<input id="docsearch-input" class="form-control bg-secondary-alt text-muted border-0 pl-1" type="search" placeholder="Search" aria-label="Search">
</div>

View File

@ -2,10 +2,10 @@
<div class="overflow-auto">
<div id="single-page-switch" class="btn-group mb-2 float-right" role="group" aria-label="{{ _('Multi-page or single-page') }}">
<a href="{{ base_url }}" role="button" class="btn btn-dark{% if config.extra.single_page %}-alt{% endif %} active" aria-pressed="{% if not config.extra.single_page %}true{% else %}false{% endif %}">
<img src="/images/mkdocs/multi.svg" alt="{{ _('Multi-page version') }}" title="{{ _('Multi-page version') }}" />
<img src="/docs/images/mkdocs/multi.svg" alt="{{ _('Multi-page version') }}" title="{{ _('Multi-page version') }}" />
</a>
<a href="{{ base_url }}/single/" role="button" class="btn btn-dark{% if not config.extra.single_page %}-alt{% endif %} active" aria-pressed="{% if not config.extra.single_page %}true{% else %}false{% endif %}">
<img src="/images/mkdocs/single.svg" alt="{{ _('Single-page version') }}" title="{{ _('Single-page version') }}" />
<img src="/docs/images/mkdocs/single.svg" alt="{{ _('Single-page version') }}" title="{{ _('Single-page version') }}" />
</a>
</div>
{% if not single_page %}