Merge branch 'master' into PF202202091030

This commit is contained in:
mergify[bot] 2022-02-09 16:27:17 +00:00 committed by GitHub
commit 6925e3d2bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 1163 additions and 285 deletions

View File

@ -120,7 +120,7 @@ The `mail` and `phone` fields are of type String, but the `icq` field is `UInt32
Get the first available contact method for the customer from the contact list:
``` sql
SELECT coalesce(mail, phone, CAST(icq,'Nullable(String)')) FROM aBook;
SELECT name, coalesce(mail, phone, CAST(icq,'Nullable(String)')) FROM aBook;
```
``` text

View File

@ -22,7 +22,7 @@ tuple(x, y, …)
## tupleElement {#tupleelement}
A function that allows getting a column from a tuple.
N is the column index, starting from 1. N must be a constant. N must be a constant. N must be a strict postive integer no greater than the size of the tuple.
N is the column index, starting from 1. N must be a constant. N must be a strict postive integer no greater than the size of the tuple.
There is no cost to execute the function.
The function implements the operator `x.N`.

View File

@ -481,7 +481,19 @@ catch (...)
void Client::connect()
{
connection_parameters = ConnectionParameters(config());
UInt16 default_port = ConnectionParameters::getPortFromConfig(config());
connection_parameters = ConnectionParameters(config(), hosts_ports[0].host,
hosts_ports[0].port.value_or(default_port));
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
for (size_t attempted_address_index = 0; attempted_address_index < hosts_ports.size(); ++attempted_address_index)
{
connection_parameters.host = hosts_ports[attempted_address_index].host;
connection_parameters.port = hosts_ports[attempted_address_index].port.value_or(default_port);
if (is_interactive)
std::cout << "Connecting to "
@ -490,11 +502,6 @@ void Client::connect()
<< connection_parameters.host << ":" << connection_parameters.port
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
try
{
connection = Connection::createConnection(connection_parameters, global_context);
@ -507,10 +514,14 @@ void Client::connect()
connection->getServerVersion(
connection_parameters.timeouts, server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
config().setString("host", connection_parameters.host);
config().setInt("port", connection_parameters.port);
break;
}
catch (const Exception & e)
{
/// It is typical when users install ClickHouse, type some password and instantly forget it.
/// This problem can't be fixed with reconnection so it is not attempted
if ((connection_parameters.user.empty() || connection_parameters.user == "default")
&& e.code() == DB::ErrorCodes::AUTHENTICATION_FAILED)
{
@ -520,10 +531,27 @@ void Client::connect()
<< "and deleting this file will reset the password." << std::endl
<< "See also /etc/clickhouse-server/users.xml on the server where ClickHouse is installed." << std::endl
<< std::endl;
}
throw;
}
else
{
if (attempted_address_index == hosts_ports.size() - 1)
throw;
if (is_interactive)
{
std::cerr << "Connection attempt to database at "
<< connection_parameters.host << ":" << connection_parameters.port
<< " resulted in failure"
<< std::endl
<< getExceptionMessage(e, false)
<< std::endl
<< "Attempting connection to the next provided address"
<< std::endl;
}
}
}
}
server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch);
load_suggestions = is_interactive && (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false));
@ -966,8 +994,11 @@ void Client::addOptions(OptionsDescription & options_description)
/// Main commandline options related to client functionality and all parameters from Settings.
options_description.main_description->add_options()
("config,c", po::value<std::string>(), "config-file path (another shorthand)")
("host,h", po::value<std::string>()->default_value("localhost"), "server host")
("port", po::value<int>()->default_value(9000), "server port")
("host,h", po::value<std::vector<HostPort>>()->multitoken()->default_value({{"localhost"}}, "localhost"),
"list of server hosts with optionally assigned port to connect. List elements are separated by a space."
"Every list element looks like '<host>[:<port>]'. If port isn't assigned, connection is made by port from '--port' param"
"Example of usage: '-h host1:1 host2 host3:3'")
("port", po::value<int>()->default_value(9000), "server port, which is default port for every host from '--host' param")
("secure,s", "Use TLS connection")
("user,u", po::value<std::string>()->default_value("default"), "user")
/** If "--password [value]" is used but the value is omitted, the bad argument exception will be thrown.
@ -1074,8 +1105,8 @@ void Client::processOptions(const OptionsDescription & options_description,
if (options.count("config"))
config().setString("config-file", options["config"].as<std::string>());
if (options.count("host") && !options["host"].defaulted())
config().setString("host", options["host"].as<std::string>());
if (options.count("host"))
hosts_ports = options["host"].as<std::vector<HostPort>>();
if (options.count("interleave-queries-file"))
interleave_queries_files = options["interleave-queries-file"].as<std::vector<std::string>>();
if (options.count("port") && !options["port"].defaulted())

View File

@ -1929,7 +1929,7 @@ void ClientBase::init(int argc, char ** argv)
/// Output of help message.
if (options.count("help")
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
|| (options.count("host") && options["host"].as<std::vector<HostPort>>()[0].host == "elp")) /// If user writes -help instead of --help.
{
printHelpMessage(options_description);
exit(0);

View File

@ -5,6 +5,7 @@
#include <Common/InterruptListener.h>
#include <Common/ShellCommand.h>
#include <Common/Stopwatch.h>
#include <Common/DNSResolver.h>
#include <Core/ExternalTable.h>
#include <Poco/Util/Application.h>
#include <Interpreters/Context.h>
@ -243,6 +244,25 @@ protected:
} profile_events;
QueryProcessingStage::Enum query_processing_stage;
struct HostPort
{
String host;
std::optional<UInt16> port{};
friend std::istream & operator>>(std::istream & in, HostPort & hostPort)
{
String host_with_port;
in >> host_with_port;
DB::DNSResolver & resolver = DB::DNSResolver::instance();
std::pair<Poco::Net::IPAddress, std::optional<UInt16>>
host_and_port = resolver.resolveHostOrAddress(host_with_port);
hostPort.host = host_and_port.first.toString();
hostPort.port = host_and_port.second;
return in;
}
};
std::vector<HostPort> hosts_ports{};
};
}

View File

@ -23,15 +23,13 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config,
std::string connection_host,
int connection_port) : host(connection_host), port(connection_port)
{
bool is_secure = config.getBool("secure", false);
security = is_secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
host = config.getString("host", "localhost");
port = config.getInt(
"port", config.getInt(is_secure ? "tcp_port_secure" : "tcp_port", is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
default_database = config.getString("database", "");
/// changed the default value to "default" to fix the issue when the user in the prompt is blank
@ -69,4 +67,17 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("tcp_keep_alive_timeout", 0), 0));
}
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
: ConnectionParameters(config, config.getString("host", "localhost"), getPortFromConfig(config))
{
}
int ConnectionParameters::getPortFromConfig(const Poco::Util::AbstractConfiguration & config)
{
bool is_secure = config.getBool("secure", false);
return config.getInt("port",
config.getInt(is_secure ? "tcp_port_secure" : "tcp_port",
is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
}
}

View File

@ -24,6 +24,9 @@ struct ConnectionParameters
ConnectionParameters() {}
ConnectionParameters(const Poco::Util::AbstractConfiguration & config);
ConnectionParameters(const Poco::Util::AbstractConfiguration & config, std::string host, int port);
static int getPortFromConfig(const Poco::Util::AbstractConfiguration & config);
};
}

View File

@ -202,6 +202,45 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, U
return Poco::Net::SocketAddress(impl->cache_host(host).front(), port);
}
std::pair<Poco::Net::IPAddress, std::optional<UInt16>> DNSResolver::resolveHostOrAddress(const std::string & host_and_port)
{
Poco::Net::IPAddress ip;
size_t number_of_colons = std::count(host_and_port.begin(), host_and_port.end(), ':');
if (number_of_colons > 1)
{
/// IPv6 host
if (host_and_port.starts_with('['))
{
size_t close_bracket_pos = host_and_port.find(']');
assert(close_bracket_pos != std::string::npos);
ip = resolveHost(host_and_port.substr(0, close_bracket_pos));
if (close_bracket_pos == host_and_port.size() - 1)
return {ip, std::nullopt};
if (host_and_port[close_bracket_pos + 1] != ':')
throw Exception("Missing delimiter between host and port", ErrorCodes::BAD_ARGUMENTS);
unsigned int port;
if (!Poco::NumberParser::tryParseUnsigned(host_and_port.substr(close_bracket_pos + 2), port))
throw Exception("Port must be numeric", ErrorCodes::BAD_ARGUMENTS);
if (port > 0xFFFF)
throw Exception("Port must be less 0xFFFF", ErrorCodes::BAD_ARGUMENTS);
return {ip, port};
}
return {resolveHost(host_and_port), std::nullopt};
}
else if (number_of_colons == 1)
{
/// IPv4 host with port
Poco::Net::SocketAddress socket = resolveAddress(host_and_port);
return {socket.host(), socket.port()};
}
/// IPv4 host
return {resolveHost(host_and_port), std::nullopt};
}
String DNSResolver::reverseResolve(const Poco::Net::IPAddress & address)
{
if (impl->disable_cache)

View File

@ -34,6 +34,10 @@ public:
Poco::Net::SocketAddress resolveAddress(const std::string & host, UInt16 port);
/// Accepts host names like 'example.com'/'example.com:port' or '127.0.0.1'/'127.0.0.1:port' or '::1'/'[::1]:port'
/// and resolves its IP and port, if port is set
std::pair<Poco::Net::IPAddress, std::optional<UInt16>> resolveHostOrAddress(const std::string & host_and_port);
/// Accepts host IP and resolves its host name
String reverseResolve(const Poco::Net::IPAddress & address);

View File

@ -281,6 +281,10 @@
M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\
\
M(MainConfigLoads, "Number of times the main configuration was reloaded.") \
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
namespace ProfileEvents
{

View File

@ -70,7 +70,9 @@ class IColumn;
M(UInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.", 0) \
M(UInt64, distributed_connections_pool_size, 1024, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 32*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 1000, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
@ -262,6 +264,7 @@ class IColumn;
M(UInt64, http_max_fields, 1000000, "Maximum number of fields in HTTP header", 0) \
M(UInt64, http_max_field_name_size, 1048576, "Maximum length of field name in HTTP header", 0) \
M(UInt64, http_max_field_value_size, 1048576, "Maximum length of field value in HTTP header", 0) \
M(Bool, http_skip_not_found_url_for_globs, true, "Skip url's for globs with HTTP_NOT_FOUND error", 0) \
M(Bool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown", 0) \
M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \
M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \

View File

@ -283,6 +283,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
bucket,
metadata.remote_fs_root_path + s3_path,
settings->s3_min_upload_part_size,
settings->s3_upload_part_size_multiply_factor,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size,
@ -338,6 +340,8 @@ void DiskS3::createFileOperationObject(const String & operation_name, UInt64 rev
bucket,
remote_fs_root_path + key,
settings->s3_min_upload_part_size,
settings->s3_upload_part_size_multiply_factor,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size,
metadata);
@ -417,6 +421,8 @@ void DiskS3::saveSchemaVersion(const int & version)
bucket,
remote_fs_root_path + SCHEMA_VERSION_OBJECT,
settings->s3_min_upload_part_size,
settings->s3_upload_part_size_multiply_factor,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size);
writeIntText(version, buffer);
@ -1076,6 +1082,8 @@ DiskS3Settings::DiskS3Settings(
const std::shared_ptr<Aws::S3::S3Client> & client_,
size_t s3_max_single_read_retries_,
size_t s3_min_upload_part_size_,
size_t s3_upload_part_size_multiply_factor_,
size_t s3_upload_part_size_multiply_parts_count_threshold_,
size_t s3_max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_,
@ -1085,6 +1093,8 @@ DiskS3Settings::DiskS3Settings(
: client(client_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, s3_min_upload_part_size(s3_min_upload_part_size_)
, s3_upload_part_size_multiply_factor(s3_upload_part_size_multiply_factor_)
, s3_upload_part_size_multiply_parts_count_threshold(s3_upload_part_size_multiply_parts_count_threshold_)
, s3_max_single_part_upload_size(s3_max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, send_metadata(send_metadata_)

View File

@ -29,6 +29,8 @@ struct DiskS3Settings
const std::shared_ptr<Aws::S3::S3Client> & client_,
size_t s3_max_single_read_retries_,
size_t s3_min_upload_part_size_,
size_t s3_upload_part_size_multiply_factor_,
size_t s3_upload_part_size_multiply_parts_count_threshold_,
size_t s3_max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_,
@ -39,6 +41,8 @@ struct DiskS3Settings
std::shared_ptr<Aws::S3::S3Client> client;
size_t s3_max_single_read_retries;
size_t s3_min_upload_part_size;
size_t s3_upload_part_size_multiply_factor;
size_t s3_upload_part_size_multiply_parts_count_threshold;
size_t s3_max_single_part_upload_size;
size_t min_bytes_for_seek;
bool send_metadata;

View File

@ -155,6 +155,8 @@ std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfigurat
getClient(config, config_prefix, context),
config.getUInt64(config_prefix + ".s3_max_single_read_retries", context->getSettingsRef().s3_max_single_read_retries),
config.getUInt64(config_prefix + ".s3_min_upload_part_size", context->getSettingsRef().s3_min_upload_part_size),
config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", context->getSettingsRef().s3_upload_part_size_multiply_factor),
config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold),
config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_metadata", false),

View File

@ -82,6 +82,7 @@ struct ReadSettings
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;
bool http_skip_not_found_url_for_globs = true;
/// Set to true for MergeTree tables to make sure
/// that last position (offset in compressed file) is always passed.

View File

@ -129,6 +129,8 @@ namespace detail
/// In case of redirects, save result uri to use it if we retry the request.
std::optional<Poco::URI> saved_uri_redirect;
bool http_skip_not_found_url;
ReadSettings settings;
Poco::Logger * log;
@ -146,7 +148,7 @@ namespace detail
return read_range.begin + offset_from_begin_pos;
}
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
std::istream * callImpl(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri_.getPath().empty())
@ -211,7 +213,7 @@ namespace detail
{
try
{
call(uri, response, Poco::Net::HTTPRequest::HTTP_HEAD);
call(response, Poco::Net::HTTPRequest::HTTP_HEAD);
while (isRedirect(response.getStatus()))
{
@ -220,7 +222,7 @@ namespace detail
session->updateSession(uri_redirect);
istr = call(uri_redirect, response, method);
istr = callImpl(uri_redirect, response, method);
}
break;
@ -237,6 +239,17 @@ namespace detail
return read_range.end;
}
enum class InitializeError
{
/// If error is not retriable, `exception` variable must be set.
NON_RETRIABLE_ERROR,
/// Allows to skip not found urls for globs
SKIP_NOT_FOUND_URL,
NONE,
};
InitializeError initialization_error = InitializeError::NONE;
public:
using NextCallback = std::function<void(size_t)>;
using OutStreamCallback = std::function<void(std::ostream &)>;
@ -253,7 +266,8 @@ namespace detail
Range read_range_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization = false,
bool use_external_buffer_ = false)
bool use_external_buffer_ = false,
bool http_skip_not_found_url_ = false)
: SeekableReadBufferWithSize(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
@ -265,6 +279,7 @@ namespace detail
, buffer_size {buffer_size_}
, use_external_buffer {use_external_buffer_}
, read_range(read_range_)
, http_skip_not_found_url(http_skip_not_found_url_)
, settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
{
@ -277,17 +292,45 @@ namespace detail
settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms);
if (!delay_initialization)
{
initialize();
if (exception)
std::rethrow_exception(exception);
}
}
void call(Poco::Net::HTTPResponse & response, const String & method_)
{
try
{
istr = callImpl(saved_uri_redirect ? *saved_uri_redirect : uri, response, method_);
}
catch (...)
{
if (response.getStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND
&& http_skip_not_found_url)
{
initialization_error = InitializeError::SKIP_NOT_FOUND_URL;
}
else
{
throw;
}
}
}
/**
* Note: In case of error return false if error is not retriable, otherwise throw.
* Throws if error is retriable, otherwise sets initialization_error = NON_RETRIABLE_ERROR and
* saves exception into `exception` variable. In case url is not found and skip_not_found_url == true,
* sets initialization_error = SKIP_NOT_FOUND_URL, otherwise throws.
*/
bool initialize()
void initialize()
{
Poco::Net::HTTPResponse response;
istr = call(saved_uri_redirect ? *saved_uri_redirect : uri, response, method);
call(response, method);
if (initialization_error != InitializeError::NONE)
return;
while (isRedirect(response.getStatus()))
{
@ -296,7 +339,7 @@ namespace detail
session->updateSession(uri_redirect);
istr = call(uri_redirect, response, method);
istr = callImpl(uri_redirect, response, method);
saved_uri_redirect = uri_redirect;
}
@ -310,7 +353,8 @@ namespace detail
Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
"Cannot read with range: [{}, {}]", read_range.begin, read_range.end ? *read_range.end : '-'));
return false;
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
return;
}
else if (read_range.end)
{
@ -345,12 +389,14 @@ namespace detail
sess->attachSessionData(e.message());
throw;
}
return true;
}
bool nextImpl() override
{
if (initialization_error == InitializeError::SKIP_NOT_FOUND_URL)
return false;
assert(initialization_error == InitializeError::NONE);
if (next_callback)
next_callback(count());
@ -392,14 +438,16 @@ namespace detail
{
if (!impl)
{
/// If error is not retriable -- false is returned and exception is set.
/// Otherwise the error is thrown and retries continue.
bool initialized = initialize();
if (!initialized)
initialize();
if (initialization_error == InitializeError::NON_RETRIABLE_ERROR)
{
assert(exception);
break;
}
else if (initialization_error == InitializeError::SKIP_NOT_FOUND_URL)
{
return false;
}
if (use_external_buffer)
{
@ -570,11 +618,12 @@ public:
Range read_range_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization_ = true,
bool use_external_buffer_ = false)
bool use_external_buffer_ = false,
bool skip_not_found_url_ = false)
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
uri_, credentials_, method_, out_stream_callback_, buffer_size_,
settings_, http_header_entries_, read_range_, remote_host_filter_,
delay_initialization_, use_external_buffer_)
delay_initialization_, use_external_buffer_, skip_not_found_url_)
{
}
};

View File

@ -54,6 +54,8 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
size_t upload_part_size_multiply_factor_,
size_t upload_part_size_multiply_threshold_,
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_,
@ -63,7 +65,9 @@ WriteBufferFromS3::WriteBufferFromS3(
, key(key_)
, object_metadata(std::move(object_metadata_))
, client_ptr(std::move(client_ptr_))
, minimum_upload_part_size(minimum_upload_part_size_)
, upload_part_size(minimum_upload_part_size_)
, upload_part_size_multiply_factor(upload_part_size_multiply_factor_)
, upload_part_size_multiply_threshold(upload_part_size_multiply_threshold_)
, max_single_part_upload_size(max_single_part_upload_size_)
, schedule(std::move(schedule_))
{
@ -85,9 +89,10 @@ void WriteBufferFromS3::nextImpl()
if (multipart_upload_id.empty() && last_part_size > max_single_part_upload_size)
createMultipartUpload();
if (!multipart_upload_id.empty() && last_part_size > minimum_upload_part_size)
if (!multipart_upload_id.empty() && last_part_size > upload_part_size)
{
writePart();
allocateBuffer();
}
@ -96,6 +101,9 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::allocateBuffer()
{
if (total_parts_uploaded != 0 && total_parts_uploaded % upload_part_size_multiply_threshold == 0)
upload_part_size *= upload_part_size_multiply_factor;
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
temporary_buffer->exceptions(std::ios::badbit);
last_part_size = 0;
@ -239,6 +247,8 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
total_parts_uploaded++;
}
void WriteBufferFromS3::completeMultipartUpload()

View File

@ -47,6 +47,8 @@ public:
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
size_t upload_part_size_multiply_factor_,
size_t upload_part_size_multiply_threshold_,
size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
@ -85,11 +87,14 @@ private:
String key;
std::optional<std::map<String, String>> object_metadata;
std::shared_ptr<Aws::S3::S3Client> client_ptr;
size_t minimum_upload_part_size;
size_t max_single_part_upload_size;
size_t upload_part_size;
const size_t upload_part_size_multiply_factor;
const size_t upload_part_size_multiply_threshold;
const size_t max_single_part_upload_size;
/// Buffer to accumulate data.
std::shared_ptr<Aws::StringStream> temporary_buffer;
size_t last_part_size;
size_t last_part_size = 0;
std::atomic<size_t> total_parts_uploaded = 0;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.

View File

@ -855,12 +855,18 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
Arena * arena) const
{
/// `data_variants` will destroy the states of aggregate functions in the destructor
data_variants.aggregator = this;
data_variants.init(AggregatedDataVariants::Type::without_key);
AggregatedDataWithoutKey & res = data_variants.without_key;
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
@ -1623,15 +1629,32 @@ Block Aggregator::prepareBlockAndFill(
}
void Aggregator::addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const
{
const auto & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
auto & data = data_variants.without_key;
size_t i = 0;
try
{
for (i = 0; i < params.aggregates_size; ++i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
}
catch (...)
{
/// Rollback
for (size_t rollback_i = 0; rollback_i < i; ++rollback_i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[rollback_i]);
column_aggregate_func.getData().pop_back();
}
throw;
}
data = nullptr;
}
void Aggregator::addArenasToAggregateColumns(

View File

@ -1138,12 +1138,12 @@ private:
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
static void executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
void executeOnIntervalWithoutKeyImpl(
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena);
Arena * arena) const;
template <typename Method>
void writeToTemporaryFileImpl(
@ -1307,7 +1307,7 @@ private:
NestedColumnsHolder & nested_columns_holder) const;
void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const;
void addArenasToAggregateColumns(

View File

@ -3148,6 +3148,7 @@ ReadSettings Context::getReadSettings() const
res.http_max_tries = settings.http_max_tries;
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms;
res.http_skip_not_found_url_for_globs = settings.http_skip_not_found_url_for_globs;
res.mmap_cache = getMMappedFileCache().get();

View File

@ -1,9 +1,9 @@
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -18,7 +18,14 @@
#include <Parsers/ASTWithElement.h>
#include <Parsers/queryToString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ScalarSubqueriesGlobalCacheHit;
extern const Event ScalarSubqueriesLocalCacheHit;
extern const Event ScalarSubqueriesCacheMiss;
}
namespace DB
{
@ -72,40 +79,95 @@ static bool worthConvertingToLiteral(const Block & scalar)
return !useless_literal_types.count(scalar_type_name);
}
void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr & ast, Data & data)
static auto getQueryInterpreter(const ASTSubquery & subquery, ExecuteScalarSubqueriesMatcher::Data & data)
{
auto hash = subquery.getTreeHash();
auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second);
Block scalar;
if (data.getContext()->hasQueryContext() && data.getContext()->getQueryContext()->hasScalar(scalar_query_hash_str))
{
scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str);
}
else if (data.scalars.count(scalar_query_hash_str))
{
scalar = data.scalars[scalar_query_hash_str];
}
else
{
auto subquery_context = Context::createCopy(data.getContext());
Settings subquery_settings = data.getContext()->getSettings();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings);
if (!data.only_analyze && subquery_context->hasQueryContext())
{
/// Save current cached scalars in the context before analyzing the query
/// This is specially helpful when analyzing CTE scalars
auto context = subquery_context->getQueryContext();
for (const auto & it : data.scalars)
context->addScalar(it.first, it.second);
}
ASTPtr subquery_select = subquery.children.at(0);
auto options = SelectQueryOptions(QueryProcessingStage::Complete, data.subquery_depth + 1, true);
options.analyze(data.only_analyze);
auto interpreter = InterpreterSelectWithUnionQuery(subquery_select, subquery_context, options);
return std::make_unique<InterpreterSelectWithUnionQuery>(subquery_select, subquery_context, options);
}
void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr & ast, Data & data)
{
auto hash = subquery.getTreeHash();
auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second);
std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter = nullptr;
bool hit = false;
bool is_local = false;
Block scalar;
if (data.local_scalars.count(scalar_query_hash_str))
{
hit = true;
scalar = data.local_scalars[scalar_query_hash_str];
is_local = true;
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesLocalCacheHit);
}
else if (data.scalars.count(scalar_query_hash_str))
{
hit = true;
scalar = data.scalars[scalar_query_hash_str];
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
}
else
{
if (data.getContext()->hasQueryContext() && data.getContext()->getQueryContext()->hasScalar(scalar_query_hash_str))
{
if (!data.getContext()->getViewSource())
{
/// We aren't using storage views so we can safely use the context cache
scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str);
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
hit = true;
}
else
{
/// If we are under a context that uses views that means that the cache might contain values that reference
/// the original table and not the view, so in order to be able to check the global cache we need to first
/// make sure that the query doesn't use the view
/// Note in any case the scalar will end up cached in *data* so this won't be repeated inside this context
interpreter = getQueryInterpreter(subquery, data);
if (!interpreter->usesViewSource())
{
scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str);
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
hit = true;
}
}
}
}
if (!hit)
{
if (!interpreter)
interpreter = getQueryInterpreter(subquery, data);
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
is_local = interpreter->usesViewSource();
Block block;
if (data.only_analyze)
{
/// If query is only analyzed, then constants are not correct.
block = interpreter.getSampleBlock();
block = interpreter->getSampleBlock();
for (auto & column : block)
{
if (column.column->empty())
@ -118,14 +180,14 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
}
else
{
auto io = interpreter.execute();
auto io = interpreter->execute();
PullingAsyncPipelineExecutor executor(io.pipeline);
while (block.rows() == 0 && executor.pull(block));
if (block.rows() == 0)
{
auto types = interpreter.getSampleBlock().getDataTypes();
auto types = interpreter->getSampleBlock().getDataTypes();
if (types.size() != 1)
types = {std::make_shared<DataTypeTuple>(types)};
@ -218,6 +280,9 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
ast = std::move(func);
}
if (is_local)
data.local_scalars[scalar_query_hash_str] = std::move(scalar);
else
data.scalars[scalar_query_hash_str] = std::move(scalar);
}

View File

@ -19,11 +19,8 @@ struct ASTTableExpression;
*
* Features
*
* A replacement occurs during query analysis, and not during the main runtime.
* This means that the progress indicator will not work during the execution of these requests,
* and also such queries can not be aborted.
*
* But the query result can be used for the index in the table.
* A replacement occurs during query analysis, and not during the main runtime, so
* the query result can be used for the index in the table.
*
* Scalar subqueries are executed on the request-initializer server.
* The request is sent to remote servers with already substituted constants.
@ -37,6 +34,7 @@ public:
{
size_t subquery_depth;
Scalars & scalars;
Scalars & local_scalars;
bool only_analyze;
};

View File

@ -159,8 +159,8 @@ static void setLazyExecutionInfo(
const ActionsDAGReverseInfo::NodeInfo & node_info = reverse_info.nodes_info[reverse_info.reverse_index.at(node)];
/// If node is used in result, we can't enable lazy execution.
if (node_info.used_in_result)
/// If node is used in result or it doesn't have parents, we can't enable lazy execution.
if (node_info.used_in_result || node_info.parents.empty())
lazy_execution_info.can_be_lazy_executed = false;
/// To fill lazy execution info for current node we need to create it for all it's parents.

View File

@ -40,6 +40,15 @@ public:
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override;
/// Returns whether the query uses the view source from the Context
/// The view source is a virtual storage that currently only materialized views use to replace the source table
/// with the incoming block only
/// This flag is useful to know for how long we can cache scalars generated by this query: If it doesn't use the virtual storage
/// then we can cache the scalars forever (for any query that doesn't use the virtual storage either), but if it does use the virtual
/// storage then we can only keep the scalar result around while we are working with that source block
/// You can find more details about this under ExecuteScalarSubqueriesMatcher::visit
bool usesViewSource() { return uses_view_source; }
protected:
ASTPtr query_ptr;
ContextMutablePtr context;
@ -48,6 +57,7 @@ protected:
size_t max_streams = 1;
bool settings_limit_offset_needed = false;
bool settings_limit_offset_done = false;
bool uses_view_source = false;
};
}

View File

@ -68,7 +68,10 @@ InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
nested_interpreters.resize(num_children);
for (size_t i = 0; i < num_children; ++i)
{
nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i));
uses_view_source |= nested_interpreters[i]->usesViewSource();
}
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)

View File

@ -64,8 +64,9 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/StorageValues.h>
#include <Storages/StorageView.h>
#include <Functions/IFunction.h>
@ -315,6 +316,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (!has_input && !storage)
{
storage = joined_tables.getLeftTableStorage();
// Mark uses_view_source if the returned storage is the same as the one saved in viewSource
uses_view_source |= storage && storage == context->getViewSource();
got_storage_from_query = true;
}
@ -336,6 +339,15 @@ InterpreterSelectQuery::InterpreterSelectQuery(
joined_tables.reset(getSelectQuery());
joined_tables.resolveTables();
if (auto view_source = context->getViewSource())
{
// If we are using a virtual block view to replace a table and that table is used
// inside the JOIN then we need to update uses_view_source accordingly so we avoid propagating scalars that we can't cache
const auto & storage_values = static_cast<const StorageValues &>(*view_source);
auto tmp_table_id = storage_values.getStorageID();
for (const auto & t : joined_tables.tablesWithColumns())
uses_view_source |= (t.table.database == tmp_table_id.database_name && t.table.table == tmp_table_id.table_name);
}
if (storage && joined_tables.isLeftTableSubquery())
{
@ -351,7 +363,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
interpreter_subquery = joined_tables.makeLeftTableSubquery(options.subquery());
if (interpreter_subquery)
{
source_header = interpreter_subquery->getSampleBlock();
uses_view_source |= interpreter_subquery->usesViewSource();
}
}
joined_tables.rewriteDistributedInAndJoins(query_ptr);
@ -389,9 +404,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query.setFinal();
/// Save scalar sub queries's results in the query context
/// But discard them if the Storage has been modified
/// In an ideal situation we would only discard the scalars affected by the storage change
if (!options.only_analyze && context->hasQueryContext() && !context->getViewSource())
/// Note that we are only saving scalars and not local_scalars since the latter can't be safely shared across contexts
if (!options.only_analyze && context->hasQueryContext())
for (const auto & it : syntax_analyzer_result->getScalars())
context->getQueryContext()->addScalar(it.first, it.second);
@ -479,6 +493,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// If there is an aggregation in the outer query, WITH TOTALS is ignored in the subquery.
if (query_analyzer->hasAggregation())
interpreter_subquery->ignoreWithTotals();
uses_view_source |= interpreter_subquery->usesViewSource();
}
required_columns = syntax_analyzer_result->requiredSourceColumns();

View File

@ -138,6 +138,9 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
nested_interpreters.emplace_back(
buildCurrentChildInterpreter(ast->list_of_selects->children.at(query_num), require_full_header ? Names() : current_required_result_column_names));
// We need to propagate the uses_view_source flag from children to the (self) parent since, if one of the children uses
// a view source that means that the parent uses it too and can be cached globally
uses_view_source |= nested_interpreters.back()->usesViewSource();
}
/// Determine structure of the result.

View File

@ -479,10 +479,11 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query,
}
/// Replacing scalar subqueries with constant values.
void executeScalarSubqueries(ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, bool only_analyze)
void executeScalarSubqueries(
ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, Scalars & local_scalars, bool only_analyze)
{
LogAST log;
ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, only_analyze};
ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, local_scalars, only_analyze};
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query);
}
@ -1158,7 +1159,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, select_options.only_analyze);
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, result.local_scalars, select_options.only_analyze);
if (settings.legacy_column_name_of_tuple_literal)
markTupleLiteralsAsLegacy(query);
@ -1248,7 +1249,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases);
/// Executing scalar subqueries. Column defaults could be a scalar subquery.
executeScalarSubqueries(query, getContext(), 0, result.scalars, !execute_scalar_subqueries);
executeScalarSubqueries(query, getContext(), 0, result.scalars, result.local_scalars, !execute_scalar_subqueries);
if (settings.legacy_column_name_of_tuple_literal)
markTupleLiteralsAsLegacy(query);

View File

@ -75,6 +75,7 @@ struct TreeRewriterResult
/// Results of scalar sub queries
Scalars scalars;
Scalars local_scalars;
explicit TreeRewriterResult(
const NamesAndTypesList & source_columns_,

View File

@ -22,38 +22,20 @@ namespace ErrorCodes
ASTPtr ASTSelectQuery::clone() const
{
auto res = std::make_shared<ASTSelectQuery>(*this);
/** NOTE Members must clone exactly in the same order in which they were inserted into `children` in ParserSelectQuery.
* This is important because the AST hash depends on the children order and this hash is used for multiple things,
* like the column identifiers in the case of subqueries in the IN statement or caching scalar queries (reused in CTEs so it's
* important for them to have the same hash).
* For distributed query processing, in case one of the servers is localhost and the other one is not, localhost query is executed
* within the process and is cloned, and the request is sent to the remote server in text form via TCP.
* And if the cloning order does not match the parsing order then different servers will get different identifiers.
*
* Since the positions map uses <key, position> we can copy it as is and ensure the new children array is created / pushed
* in the same order as the existing one */
res->children.clear();
res->positions.clear();
#define CLONE(expr) res->setExpression(expr, getExpression(expr, true))
/** NOTE Members must clone exactly in the same order,
* in which they were inserted into `children` in ParserSelectQuery.
* This is important because of the children's names the identifier (getTreeHash) is compiled,
* which can be used for column identifiers in the case of subqueries in the IN statement.
* For distributed query processing, in case one of the servers is localhost and the other one is not,
* localhost query is executed within the process and is cloned,
* and the request is sent to the remote server in text form via TCP.
* And if the cloning order does not match the parsing order,
* then different servers will get different identifiers.
*/
CLONE(Expression::WITH);
CLONE(Expression::SELECT);
CLONE(Expression::TABLES);
CLONE(Expression::PREWHERE);
CLONE(Expression::WHERE);
CLONE(Expression::GROUP_BY);
CLONE(Expression::HAVING);
CLONE(Expression::WINDOW);
CLONE(Expression::ORDER_BY);
CLONE(Expression::LIMIT_BY_OFFSET);
CLONE(Expression::LIMIT_BY_LENGTH);
CLONE(Expression::LIMIT_BY);
CLONE(Expression::LIMIT_OFFSET);
CLONE(Expression::LIMIT_LENGTH);
CLONE(Expression::SETTINGS);
#undef CLONE
for (const auto & child : children)
res->children.push_back(child->clone());
return res;
}

View File

@ -121,7 +121,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
current_memory_usage = getCurrentMemoryUsage() - initial_memory_usage;

View File

@ -24,7 +24,9 @@ RemoteInserter::RemoteInserter(
const String & query_,
const Settings & settings_,
const ClientInfo & client_info_)
: connection(connection_), query(query_)
: connection(connection_)
, query(query_)
, server_revision(connection.getServerRevision(timeouts))
{
ClientInfo modified_client_info = client_info_;
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;

View File

@ -35,12 +35,14 @@ public:
~RemoteInserter();
const Block & getHeader() const { return header; }
UInt64 getServerRevision() const { return server_revision; }
private:
Connection & connection;
String query;
Block header;
bool finished = false;
UInt64 server_revision;
};
}

View File

@ -132,6 +132,7 @@ namespace
struct DistributedHeader
{
UInt64 revision = 0;
Settings insert_settings;
std::string insert_query;
ClientInfo client_info;
@ -166,9 +167,8 @@ namespace
/// Read the parts of the header.
ReadBufferFromString header_buf(header_data);
UInt64 initiator_revision;
readVarUInt(initiator_revision, header_buf);
if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision)
readVarUInt(distributed_header.revision, header_buf);
if (DBMS_TCP_PROTOCOL_VERSION < distributed_header.revision)
{
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
}
@ -177,7 +177,7 @@ namespace
distributed_header.insert_settings.read(header_buf);
if (header_buf.hasPendingData())
distributed_header.client_info.read(header_buf, initiator_revision);
distributed_header.client_info.read(header_buf, distributed_header.revision);
if (header_buf.hasPendingData())
{
@ -188,10 +188,12 @@ namespace
if (header_buf.hasPendingData())
{
NativeReader header_block_in(header_buf, DBMS_TCP_PROTOCOL_VERSION);
NativeReader header_block_in(header_buf, distributed_header.revision);
distributed_header.block_header = header_block_in.read();
if (!distributed_header.block_header)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read header from the {} batch", in.getFileName());
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read header from the {} batch. Data was written with protocol version {}, current version: {}",
in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION);
}
/// Add handling new data here, for example:
@ -264,10 +266,10 @@ namespace
return nullptr;
}
void writeAndConvert(RemoteInserter & remote, ReadBufferFromFile & in)
void writeAndConvert(RemoteInserter & remote, const DistributedHeader & distributed_header, ReadBufferFromFile & in)
{
CompressedReadBuffer decompressing_in(in);
NativeReader block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
NativeReader block_in(decompressing_in, distributed_header.revision);
while (Block block = block_in.read())
{
@ -304,7 +306,7 @@ namespace
{
LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName());
writeAndConvert(remote, in);
writeAndConvert(remote, distributed_header, in);
return;
}
@ -314,14 +316,20 @@ namespace
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure());
writeAndConvert(remote, in);
writeAndConvert(remote, distributed_header, in);
return;
}
/// If connection does not use compression, we have to uncompress the data.
if (!compression_expected)
{
writeAndConvert(remote, in);
writeAndConvert(remote, distributed_header, in);
return;
}
if (distributed_header.revision != remote.getServerRevision())
{
writeAndConvert(remote, distributed_header, in);
return;
}
@ -915,10 +923,10 @@ public:
{
in = std::make_unique<ReadBufferFromFile>(file_name);
decompressing_in = std::make_unique<CompressedReadBuffer>(*in);
block_in = std::make_unique<NativeReader>(*decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
log = &Poco::Logger::get("DirectoryMonitorSource");
readDistributedHeader(*in, log);
auto distributed_header = readDistributedHeader(*in, log);
block_in = std::make_unique<NativeReader>(*decompressing_in, distributed_header.revision);
first_block = block_in->read();
}
@ -1040,7 +1048,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
LOG_DEBUG(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
CompressedReadBuffer decompressing_in(in);
NativeReader block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
NativeReader block_in(decompressing_in, distributed_header.revision);
while (Block block = block_in.read())
{

View File

@ -385,13 +385,18 @@ public:
const String & bucket,
const String & key,
size_t min_upload_part_size,
size_t upload_part_size_multiply_factor,
size_t upload_part_size_multiply_parts_count_threshold,
size_t max_single_part_upload_size)
: SinkToStorage(sample_block_)
, sample_block(sample_block_)
, format_settings(format_settings_)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
std::make_unique<WriteBufferFromS3>(
client, bucket, key, min_upload_part_size,
upload_part_size_multiply_factor, upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
}
@ -440,6 +445,8 @@ public:
const String & bucket_,
const String & key_,
size_t min_upload_part_size_,
size_t upload_part_size_multiply_factor_,
size_t upload_part_size_multiply_parts_count_threshold_,
size_t max_single_part_upload_size_)
: PartitionedSink(partition_by, context_, sample_block_)
, format(format_)
@ -450,6 +457,8 @@ public:
, bucket(bucket_)
, key(key_)
, min_upload_part_size(min_upload_part_size_)
, upload_part_size_multiply_factor(upload_part_size_multiply_factor_)
, upload_part_size_multiply_parts_count_threshold(upload_part_size_multiply_parts_count_threshold_)
, max_single_part_upload_size(max_single_part_upload_size_)
, format_settings(format_settings_)
{
@ -473,6 +482,8 @@ public:
partition_bucket,
partition_key,
min_upload_part_size,
upload_part_size_multiply_factor,
upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size
);
}
@ -487,6 +498,8 @@ private:
const String bucket;
const String key;
size_t min_upload_part_size;
size_t upload_part_size_multiply_factor;
size_t upload_part_size_multiply_parts_count_threshold;
size_t max_single_part_upload_size;
std::optional<FormatSettings> format_settings;
@ -527,6 +540,8 @@ StorageS3::StorageS3(
const String & format_name_,
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 upload_part_size_multiply_factor_,
UInt64 upload_part_size_multiply_parts_count_threshold_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
const ColumnsDescription & columns_,
@ -543,6 +558,8 @@ StorageS3::StorageS3(
, format_name(format_name_)
, max_single_read_retries(max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
, upload_part_size_multiply_factor(upload_part_size_multiply_factor_)
, upload_part_size_multiply_parts_count_threshold(upload_part_size_multiply_parts_count_threshold_)
, max_single_part_upload_size(max_single_part_upload_size_)
, compression_method(compression_method_)
, name(uri_.storage_name)
@ -669,6 +686,8 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
client_auth.uri.bucket,
keys.back(),
min_upload_part_size,
upload_part_size_multiply_factor,
upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size);
}
else
@ -712,6 +731,8 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
client_auth.uri.bucket,
keys.back(),
min_upload_part_size,
upload_part_size_multiply_factor,
upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size);
}
}
@ -923,7 +944,10 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
S3::URI s3_uri(Poco::URI(configuration.url));
auto max_single_read_retries = args.getLocalContext()->getSettingsRef().s3_max_single_read_retries;
auto min_upload_part_size = args.getLocalContext()->getSettingsRef().s3_min_upload_part_size;
auto upload_part_size_multiply_factor = args.getLocalContext()->getSettingsRef().s3_upload_part_size_multiply_factor;
auto upload_part_size_multiply_parts_count_threshold = args.getLocalContext()->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold;
auto max_single_part_upload_size = args.getLocalContext()->getSettingsRef().s3_max_single_part_upload_size;
auto max_connections = args.getLocalContext()->getSettingsRef().s3_max_connections;
ASTPtr partition_by;
@ -938,6 +962,8 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
configuration.format,
max_single_read_retries,
min_upload_part_size,
upload_part_size_multiply_factor,
upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size,
max_connections,
args.columns,

View File

@ -126,6 +126,8 @@ public:
const String & format_name_,
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 upload_part_size_multiply_factor_,
UInt64 upload_part_size_multiply_parts_count_threshold_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
const ColumnsDescription & columns_,
@ -193,6 +195,8 @@ private:
String format_name;
UInt64 max_single_read_retries;
size_t min_upload_part_size;
size_t upload_part_size_multiply_factor;
size_t upload_part_size_multiply_parts_count_threshold;
size_t max_single_part_upload_size;
String compression_method;
String name;

View File

@ -36,6 +36,14 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int NETWORK_ERROR;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
static bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
|| uri.find('|') != std::string::npos;
}
@ -62,6 +70,7 @@ IStorageURLBase::IStorageURLBase(
, partition_by(partition_by_)
{
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = getTableStructureFromData(format_name, uri, compression_method, headers, format_settings, context_);
@ -69,52 +78,12 @@ IStorageURLBase::IStorageURLBase(
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
const String & compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
auto parsed_uri = Poco::URI(uri);
Poco::Net::HTTPBasicCredentials credentials;
std::string user_info = parsed_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
auto read_buffer_creator = [&]()
{
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
parsed_uri,
Poco::Net::HTTPRequest::HTTP_GET,
nullptr,
ConnectionTimeouts::getHTTPTimeouts(context),
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter()),
chooseCompressionMethod(parsed_uri.getPath(), compression_method));
};
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
namespace
{
@ -163,6 +132,20 @@ namespace
reader->cancel();
}
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
}
StorageURLSource(
URIInfoPtr uri_info_,
const std::string & http_method,
@ -177,7 +160,8 @@ namespace
const ConnectionTimeouts & timeouts,
const String & compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const URIParams & params = {})
const URIParams & params = {},
bool glob_url = false)
: SourceWithProgress(sample_block), name(std::move(name_))
, uri_info(uri_info_)
{
@ -186,27 +170,26 @@ namespace
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
{
WriteBufferFromOwnString error_message;
for (auto option = uri_options.begin(); option < uri_options.end(); ++option)
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
if (uri_options.size() > 1)
{
auto request_uri = Poco::URI(*option);
read_buf = getFirstAvailableURLReadBuffer(
uri_options, context, params, http_method,
callback, timeouts, compression_method, credentials, headers);
}
else
{
ReadSettings read_settings = context->getReadSettings();
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs;
auto request_uri = Poco::URI(uri_options[0]);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
try
{
std::string user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
setCredentials(credentials, request_uri);
/// Get first alive uri.
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
@ -216,24 +199,15 @@ namespace
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter()),
context->getRemoteHostFilter(),
/* delay_initiliazation */true,
/* use_external_buffer */false,
/* skip_url_not_found_error */skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
catch (...)
{
if (uri_options.size() == 1)
throw;
if (option == uri_options.end() - 1)
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri options are unreachable. {}", error_message.str());
error_message << *option << " error: " << getCurrentExceptionMessage(false) << "\n";
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
QueryPipelineBuilder builder;
@ -283,6 +257,65 @@ namespace
}
}
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
const std::vector<String> & urls,
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
const String & compression_method,
Poco::Net::HTTPBasicCredentials & credentials,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers)
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
for (auto option = urls.begin(); option != urls.end(); ++option)
{
bool skip_url_not_found_error = read_settings.http_skip_not_found_url_for_globs && option == std::prev(urls.end());
auto request_uri = Poco::URI(*option);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
setCredentials(credentials, request_uri);
try
{
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */false,
/* use_external_buffer */false,
/* skip_url_not_found_error */skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
if (urls.size() == 1)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", urls.size(), first_exception_message);
}
private:
using InitializeFunc = std::function<void(const URIInfo::FailoverOptions &)>;
InitializeFunc initialize;
@ -297,7 +330,7 @@ namespace
/// have R/W access to reader pointer.
std::mutex reader_mutex;
Poco::Net::HTTPBasicCredentials credentials{};
Poco::Net::HTTPBasicCredentials credentials;
};
}
@ -407,6 +440,71 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
}
ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
const String & compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
ReadBufferCreator read_buffer_creator;
Poco::Net::HTTPBasicCredentials credentials;
if (urlWithGlobs(uri))
{
std::vector<String> urls_to_check;
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
read_buffer_creator = [&, urls_to_check]()
{
return StorageURLSource::getFirstAvailableURLReadBuffer(
urls_to_check,
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
credentials,
headers);
};
}
else
{
read_buffer_creator = [&]()
{
auto parsed_uri = Poco::URI(uri);
StorageURLSource::setCredentials(credentials, parsed_uri);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
parsed_uri,
Poco::Net::HTTPRequest::HTTP_GET,
nullptr,
ConnectionTimeouts::getHTTPTimeouts(context),
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */true),
chooseCompressionMethod(parsed_uri.getPath(), compression_method));
};
}
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -417,10 +515,8 @@ Pipe IStorageURLBase::read(
unsigned num_streams)
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
bool with_globs = (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
|| uri.find('|') != std::string::npos;
if (with_globs)
if (urlWithGlobs(uri))
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
@ -452,7 +548,7 @@ Pipe IStorageURLBase::read(
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
compression_method, headers, params, /* glob_url */true));
}
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -148,6 +148,8 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
S3::URI s3_uri (uri);
UInt64 max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 upload_part_size_multiply_factor = context->getSettingsRef().s3_upload_part_size_multiply_factor;
UInt64 upload_part_size_multiply_parts_count_threshold = context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
@ -163,6 +165,8 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
s3_configuration->format,
max_single_read_retries,
min_upload_part_size,
upload_part_size_multiply_factor,
upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size,
max_connections,
getActualTableStructure(context),

View File

@ -109,6 +109,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
/// Actually this parameters are not used
UInt64 max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 upload_part_size_multiply_factor = context->getSettingsRef().s3_upload_part_size_multiply_factor;
UInt64 upload_part_size_multiply_parts_count_threshold = context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
storage = StorageS3::create(
@ -119,6 +121,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
format,
max_single_read_retries,
min_upload_part_size,
upload_part_size_multiply_factor,
upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size,
max_connections,
getActualTableStructure(context),

View File

@ -0,0 +1,12 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,39 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node_shard = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
node_dist = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], image='yandex/clickhouse-server',
tag='21.11.9.1', stay_alive=True, with_installed_binary=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node_shard.query("CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id")
node_dist.query("CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id")
node_dist.query("CREATE TABLE dist_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table, rand())")
yield cluster
finally:
cluster.shutdown()
def test_distributed_in_tuple(started_cluster):
node_dist.query("SYSTEM STOP DISTRIBUTED SENDS dist_table")
node_dist.query("INSERT INTO dist_table VALUES (1, 'foo')")
assert node_dist.query("SELECT count() FROM dist_table") == "0\n"
assert node_shard.query("SELECT count() FROM local_table") == "0\n"
node_dist.restart_with_latest_version(signal=9)
node_dist.query("SYSTEM FLUSH DISTRIBUTED dist_table")
assert node_dist.query("SELECT count() FROM dist_table") == "1\n"
assert node_shard.query("SELECT count() FROM local_table") == "1\n"

View File

@ -13,11 +13,19 @@ node2 = cluster.add_instance('node2', main_configs=['configs/named_collections.x
def started_cluster():
try:
cluster.start()
node1.query("CREATE DATABASE test")
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def setup_teardown():
print("PostgreSQL is available - running test")
yield # run test
node1.query("DROP DATABASE test")
node1.query("CREATE DATABASE test")
def test_postgres_select_insert(started_cluster):
cursor = started_cluster.postgres_conn.cursor()
table_name = 'test_many'
@ -143,11 +151,11 @@ def test_non_default_scema(started_cluster):
cursor.execute('INSERT INTO test_schema.test_table SELECT i FROM generate_series(0, 99) as t(i)')
node1.query('''
CREATE TABLE test_pg_table_schema (a UInt32)
CREATE TABLE test.test_pg_table_schema (a UInt32)
ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_schema');
''')
result = node1.query('SELECT * FROM test_pg_table_schema')
result = node1.query('SELECT * FROM test.test_pg_table_schema')
expected = node1.query('SELECT number FROM numbers(100)')
assert(result == expected)
@ -160,10 +168,10 @@ def test_non_default_scema(started_cluster):
cursor.execute('INSERT INTO "test.nice.schema"."test.nice.table" SELECT i FROM generate_series(0, 99) as t(i)')
node1.query('''
CREATE TABLE test_pg_table_schema_with_dots (a UInt32)
CREATE TABLE test.test_pg_table_schema_with_dots (a UInt32)
ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test.nice.table', 'postgres', 'mysecretpassword', 'test.nice.schema');
''')
result = node1.query('SELECT * FROM test_pg_table_schema_with_dots')
result = node1.query('SELECT * FROM test.test_pg_table_schema_with_dots')
assert(result == expected)
cursor.execute('INSERT INTO "test_schema"."test_table" SELECT i FROM generate_series(100, 199) as t(i)')
@ -173,8 +181,8 @@ def test_non_default_scema(started_cluster):
cursor.execute('DROP SCHEMA test_schema CASCADE')
cursor.execute('DROP SCHEMA "test.nice.schema" CASCADE')
node1.query('DROP TABLE test_pg_table_schema')
node1.query('DROP TABLE test_pg_table_schema_with_dots')
node1.query('DROP TABLE test.test_pg_table_schema')
node1.query('DROP TABLE test.test_pg_table_schema_with_dots')
def test_concurrent_queries(started_cluster):
@ -302,19 +310,19 @@ def test_postgres_distributed(started_cluster):
def test_datetime_with_timezone(started_cluster):
cursor = started_cluster.postgres_conn.cursor()
cursor.execute("DROP TABLE IF EXISTS test_timezone")
node1.query("DROP TABLE IF EXISTS test_timezone")
node1.query("DROP TABLE IF EXISTS test.test_timezone")
cursor.execute("CREATE TABLE test_timezone (ts timestamp without time zone, ts_z timestamp with time zone)")
cursor.execute("insert into test_timezone select '2014-04-04 20:00:00', '2014-04-04 20:00:00'::timestamptz at time zone 'America/New_York';")
cursor.execute("select * from test_timezone")
result = cursor.fetchall()[0]
logging.debug(f'{result[0]}, {str(result[1])[:-6]}')
node1.query("create table test_timezone ( ts DateTime, ts_z DateTime('America/New_York')) ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_timezone', 'postgres', 'mysecretpassword');")
assert(node1.query("select ts from test_timezone").strip() == str(result[0]))
node1.query("create table test.test_timezone ( ts DateTime, ts_z DateTime('America/New_York')) ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_timezone', 'postgres', 'mysecretpassword');")
assert(node1.query("select ts from test.test_timezone").strip() == str(result[0]))
# [:-6] because 2014-04-04 16:00:00+00:00 -> 2014-04-04 16:00:00
assert(node1.query("select ts_z from test_timezone").strip() == str(result[1])[:-6])
assert(node1.query("select * from test_timezone") == "2014-04-04 20:00:00\t2014-04-04 16:00:00\n")
assert(node1.query("select ts_z from test.test_timezone").strip() == str(result[1])[:-6])
assert(node1.query("select * from test.test_timezone") == "2014-04-04 20:00:00\t2014-04-04 16:00:00\n")
cursor.execute("DROP TABLE test_timezone")
node1.query("DROP TABLE test_timezone")
node1.query("DROP TABLE test.test_timezone")
def test_postgres_ndim(started_cluster):
@ -342,20 +350,20 @@ def test_postgres_on_conflict(started_cluster):
cursor.execute(f'CREATE TABLE {table} (a integer PRIMARY KEY, b text, c integer)')
node1.query('''
CREATE TABLE test_conflict (a UInt32, b String, c Int32)
CREATE TABLE test.test_conflict (a UInt32, b String, c Int32)
ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_conflict', 'postgres', 'mysecretpassword', '', 'ON CONFLICT DO NOTHING');
''')
node1.query(f''' INSERT INTO {table} SELECT number, concat('name_', toString(number)), 3 from numbers(100)''')
node1.query(f''' INSERT INTO {table} SELECT number, concat('name_', toString(number)), 4 from numbers(100)''')
node1.query(f''' INSERT INTO test.{table} SELECT number, concat('name_', toString(number)), 3 from numbers(100)''')
node1.query(f''' INSERT INTO test.{table} SELECT number, concat('name_', toString(number)), 4 from numbers(100)''')
check1 = f"SELECT count() FROM {table}"
check1 = f"SELECT count() FROM test.{table}"
assert (node1.query(check1)).rstrip() == '100'
table_func = f'''postgresql('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'postgres', '{table}', 'postgres', 'mysecretpassword', '', 'ON CONFLICT DO NOTHING')'''
node1.query(f'''INSERT INTO TABLE FUNCTION {table_func} SELECT number, concat('name_', toString(number)), 3 from numbers(100)''')
node1.query(f'''INSERT INTO TABLE FUNCTION {table_func} SELECT number, concat('name_', toString(number)), 3 from numbers(100)''')
check1 = f"SELECT count() FROM {table}"
check1 = f"SELECT count() FROM test.{table}"
assert (node1.query(check1)).rstrip() == '100'
cursor.execute(f'DROP TABLE {table} ')
@ -367,48 +375,48 @@ def test_predefined_connection_configuration(started_cluster):
cursor.execute(f'CREATE TABLE test_table (a integer PRIMARY KEY, b integer)')
node1.query('''
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table (a UInt32, b Int32)
DROP TABLE IF EXISTS test.test_table;
CREATE TABLE test.test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres1);
''')
node1.query(f''' INSERT INTO test_table SELECT number, number from numbers(100)''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')
node1.query(f''' INSERT INTO test.test_table SELECT number, number from numbers(100)''')
assert (node1.query(f"SELECT count() FROM test.test_table").rstrip() == '100')
node1.query('''
DROP TABLE test_table;
CREATE TABLE test_table (a UInt32, b Int32)
DROP TABLE test.test_table;
CREATE TABLE test.test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres1, on_conflict='ON CONFLICT DO NOTHING');
''')
node1.query(f''' INSERT INTO test_table SELECT number, number from numbers(100)''')
node1.query(f''' INSERT INTO test_table SELECT number, number from numbers(100)''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')
node1.query(f''' INSERT INTO test.test_table SELECT number, number from numbers(100)''')
node1.query(f''' INSERT INTO test.test_table SELECT number, number from numbers(100)''')
assert (node1.query(f"SELECT count() FROM test.test_table").rstrip() == '100')
node1.query('DROP TABLE test_table;')
node1.query('DROP TABLE test.test_table;')
node1.query_and_get_error('''
CREATE TABLE test_table (a UInt32, b Int32)
CREATE TABLE test.test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres1, 'ON CONFLICT DO NOTHING');
''')
node1.query_and_get_error('''
CREATE TABLE test_table (a UInt32, b Int32)
CREATE TABLE test.test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres2);
''')
node1.query_and_get_error('''
CREATE TABLE test_table (a UInt32, b Int32)
CREATE TABLE test.test_table (a UInt32, b Int32)
ENGINE PostgreSQL(unknown_collection);
''')
node1.query('''
CREATE TABLE test_table (a UInt32, b Int32)
CREATE TABLE test.test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres1, port=5432, database='postgres', table='test_table');
''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')
assert (node1.query(f"SELECT count() FROM test.test_table").rstrip() == '100')
node1.query('''
DROP TABLE test_table;
CREATE TABLE test_table (a UInt32, b Int32)
DROP TABLE test.test_table;
CREATE TABLE test.test_table (a UInt32, b Int32)
ENGINE PostgreSQL(postgres3, port=5432);
''')
assert (node1.query(f"SELECT count() FROM test_table").rstrip() == '100')
assert (node1.query(f"SELECT count() FROM test.test_table").rstrip() == '100')
assert (node1.query(f"SELECT count() FROM postgresql(postgres1)").rstrip() == '100')
node1.query("INSERT INTO TABLE FUNCTION postgresql(postgres1, on_conflict='ON CONFLICT DO NOTHING') SELECT number, number from numbers(100)")

View File

@ -0,0 +1,11 @@
1
1
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,51 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# default values test
${CLICKHOUSE_CLIENT} --query "SELECT 1"
# backward compatibility test
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${CLICKHOUSE_PORT_TCP}" --query "SELECT 1";
not_resolvable_host="notlocalhost"
exception_msg="Cannot resolve host (${not_resolvable_host}), error 0: ${not_resolvable_host}.
Code: 198. DB::Exception: Not found address of host: ${not_resolvable_host}. (DNS_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" "${not_resolvable_host}" --query "SELECT 1" 2>&1 > /dev/null)";
[ "${error}" == "${exception_msg}" ]; echo "$?"
not_number_port="abc"
exception_msg="Bad arguments: the argument ('${CLICKHOUSE_HOST}:${not_number_port}') for option '--host' is invalid."
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${not_number_port}" --query "SELECT 1" 2>&1 > /dev/null)";
[ "${error}" == "${exception_msg}" ]; echo "$?"
not_alive_host="10.100.0.0"
${CLICKHOUSE_CLIENT} --host "${not_alive_host}" "${CLICKHOUSE_HOST}" --query "SELECT 1";
not_alive_port="1"
exception_msg="Code: 210. DB::NetException: Connection refused (${CLICKHOUSE_HOST}:${not_alive_port}). (NETWORK_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${not_alive_port}" --query "SELECT 1" 2>&1 > /dev/null)"
[ "${error}" == "${exception_msg}" ]; echo "$?"
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${not_alive_port}" "${CLICKHOUSE_HOST}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_TCP}" --port "${not_alive_port}" --query "SELECT 1";
ipv6_host_without_brackets="2001:3984:3989::1:1000"
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_without_brackets}). (NETWORK_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_without_brackets}" --query "SELECT 1" 2>&1 > /dev/null)"
[ "${error}" == "${exception_msg}" ]; echo "$?"
ipv6_host_with_brackets="[2001:3984:3989::1:1000]"
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_with_brackets}). (NETWORK_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_with_brackets}" --query "SELECT 1" 2>&1 > /dev/null)"
[ "${error}" == "${exception_msg}" ]; echo "$?"
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_with_brackets}:${not_alive_port}). (NETWORK_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_with_brackets}:${not_alive_port}" --query "SELECT 1" 2>&1 > /dev/null)"
[ "${error}" == "${exception_msg}" ]; echo "$?"

View File

@ -0,0 +1,24 @@
0 Value
{
"meta":
[
{
"name": "dictGet(02154_test_dictionary, 'value', toUInt64(0))",
"type": "String"
},
{
"name": "dictGet(02154_test_dictionary, 'value', toUInt64(1))",
"type": "String"
}
],
"data":
[
{
"dictGet(02154_test_dictionary, 'value', toUInt64(0))": "Value",
"dictGet(02154_test_dictionary, 'value', toUInt64(1))": ""
}
],
"rows": 1
}

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS 02154_test_source_table"
$CLICKHOUSE_CLIENT -q """
CREATE TABLE 02154_test_source_table
(
id UInt64,
value String
) ENGINE=TinyLog;
"""
$CLICKHOUSE_CLIENT -q "INSERT INTO 02154_test_source_table VALUES (0, 'Value')"
$CLICKHOUSE_CLIENT -q "SELECT * FROM 02154_test_source_table"
$CLICKHOUSE_CLIENT -q "DROP DICTIONARY IF EXISTS 02154_test_dictionary"
$CLICKHOUSE_CLIENT -q """
CREATE DICTIONARY 02154_test_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
LAYOUT(HASHED())
LIFETIME(0)
SOURCE(CLICKHOUSE(TABLE '02154_test_source_table'))
"""
echo """
SELECT dictGet(02154_test_dictionary, 'value', toUInt64(0)), dictGet(02154_test_dictionary, 'value', toUInt64(1))
FORMAT JSON
""" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&wait_end_of_query=1&output_format_write_statistics=0" -d @-
$CLICKHOUSE_CLIENT -q "DROP DICTIONARY 02154_test_dictionary"
$CLICKHOUSE_CLIENT -q "DROP TABLE 02154_test_source_table"

View File

@ -0,0 +1,2 @@
02177_CTE_GLOBAL_ON 5 500 11 0 5
02177_CTE_GLOBAL_OFF 1 100 5 0 1

View File

@ -0,0 +1,48 @@
WITH
( SELECT sleep(0.0001) FROM system.one ) as a1,
( SELECT sleep(0.0001) FROM system.one ) as a2,
( SELECT sleep(0.0001) FROM system.one ) as a3,
( SELECT sleep(0.0001) FROM system.one ) as a4,
( SELECT sleep(0.0001) FROM system.one ) as a5
SELECT '02177_CTE_GLOBAL_ON', a5 FROM system.numbers LIMIT 100
FORMAT Null
SETTINGS enable_global_with_statement = 1;
WITH
( SELECT sleep(0.0001) FROM system.one ) as a1,
( SELECT sleep(0.0001) FROM system.one ) as a2,
( SELECT sleep(0.0001) FROM system.one ) as a3,
( SELECT sleep(0.0001) FROM system.one ) as a4,
( SELECT sleep(0.0001) FROM system.one ) as a5
SELECT '02177_CTE_GLOBAL_OFF', a5 FROM system.numbers LIMIT 100
FORMAT Null
SETTINGS enable_global_with_statement = 0;
SYSTEM FLUSH LOGS;
SELECT
'02177_CTE_GLOBAL_ON',
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '%SELECT ''02177_CTE_GLOBAL_ON%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
SELECT
'02177_CTE_GLOBAL_OFF',
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '%02177_CTE_GLOBAL_OFF%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;

View File

@ -0,0 +1,63 @@
4 4 4 4 5
9 9 9 9 5
14 14 14 14 5
19 19 19 19 5
24 24 24 24 5
29 29 29 29 5
34 34 34 34 5
39 39 39 39 5
44 44 44 44 5
49 49 49 49 5
54 54 54 54 5
59 59 59 59 5
64 64 64 64 5
69 69 69 69 5
74 74 74 74 5
79 79 79 79 5
84 84 84 84 5
89 89 89 89 5
94 94 94 94 5
99 99 99 99 5
02177_MV 7 80 22
10
40
70
100
130
160
190
220
250
280
310
340
370
400
430
460
490
520
550
580
02177_MV_2 0 0 21
8
18
28
38
48
58
68
78
88
98
108
118
128
138
148
158
168
178
188
198
02177_MV_3 19 0 2

View File

@ -0,0 +1,133 @@
-- TEST CACHE
CREATE TABLE t1 (i Int64, j Int64) ENGINE = Memory;
INSERT INTO t1 SELECT number, number FROM system.numbers LIMIT 100;
CREATE TABLE t2 (k Int64, l Int64, m Int64, n Int64) ENGINE = Memory;
CREATE MATERIALIZED VIEW mv1 TO t2 AS
WITH
(SELECT max(i) FROM t1) AS t1
SELECT
t1 as k, -- Using local cache x 4
t1 as l,
t1 as m,
t1 as n
FROM t1
LIMIT 5;
-- FIRST INSERT
INSERT INTO t1
WITH
(SELECT max(i) FROM t1) AS t1
SELECT
number as i,
t1 + t1 + t1 AS j -- Using global cache
FROM system.numbers
LIMIT 100
SETTINGS
min_insert_block_size_rows=5,
max_insert_block_size=5,
min_insert_block_size_rows_for_materialized_views=5,
max_block_size=5,
max_threads=1;
SELECT k, l, m, n, count()
FROM t2
GROUP BY k, l, m, n
ORDER BY k, l, m, n;
SYSTEM FLUSH LOGS;
-- The main query should have a cache miss and 3 global hits
-- The MV is executed 20 times (100 / 5) and each run does 1 miss and 4 hits to the LOCAL cache
-- In addition to this, to prepare the MV, there is an extra preparation to get the list of columns via
-- InterpreterSelectQuery, which adds 1 miss and 4 global hits (since it uses the global cache)
-- So in total we have:
-- Main query: 1 miss, 3 global
-- Preparation: 1 miss, 4 global
-- Blocks (20): 20 miss, 0 global, 80 local hits
-- TOTAL: 22 miss, 7 global, 80 local
SELECT
'02177_MV',
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '-- FIRST INSERT\nINSERT INTO t1\n%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
DROP TABLE mv1;
CREATE TABLE t3 (z Int64) ENGINE = Memory;
CREATE MATERIALIZED VIEW mv2 TO t3 AS
SELECT
-- This includes an unnecessarily complex query to verify that the local cache is used (since it uses t1)
sum(i) + sum(j) + (SELECT * FROM (SELECT min(i) + min(j) FROM (SELECT * FROM system.one _a, t1 _b))) AS z
FROM t1;
-- SECOND INSERT
INSERT INTO t1
SELECT 0 as i, number as j from numbers(100)
SETTINGS
min_insert_block_size_rows=5,
max_insert_block_size=5,
min_insert_block_size_rows_for_materialized_views=5,
max_block_size=5,
max_threads=1;
SELECT * FROM t3 ORDER BY z ASC;
SYSTEM FLUSH LOGS;
SELECT
'02177_MV_2',
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '-- SECOND INSERT\nINSERT INTO t1%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
DROP TABLE mv2;
CREATE TABLE t4 (z Int64) ENGINE = Memory;
CREATE MATERIALIZED VIEW mv3 TO t4 AS
SELECT
-- This includes an unnecessarily complex query but now it uses t2 so it can be cached
min(i) + min(j) + (SELECT * FROM (SELECT min(k) + min(l) FROM (SELECT * FROM system.one _a, t2 _b))) AS z
FROM t1;
-- THIRD INSERT
INSERT INTO t1
SELECT number as i, number as j from numbers(100)
SETTINGS
min_insert_block_size_rows=5,
max_insert_block_size=5,
min_insert_block_size_rows_for_materialized_views=5,
max_block_size=5,
max_threads=1;
SYSTEM FLUSH LOGS;
SELECT * FROM t4 ORDER BY z ASC;
SELECT
'02177_MV_3',
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '-- THIRD INSERT\nINSERT INTO t1%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
DROP TABLE mv3;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
DROP TABLE t4;

View File

@ -0,0 +1,2 @@
SELECT 1 FROM (SELECT arrayJoin(if(empty(range(number)), [1], [2])) from numbers(1));