Merge branch 'master' into ignore-attach-thread-keeper-errors

This commit is contained in:
Antonio Andelic 2022-09-26 08:38:48 +02:00 committed by GitHub
commit c60d9db687
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
76 changed files with 1137 additions and 418 deletions

View File

@ -163,7 +163,7 @@
* Fix `base58Encode / base58Decode` handling leading 0 / '1'. [#40620](https://github.com/ClickHouse/ClickHouse/pull/40620) ([Andrey Zvonov](https://github.com/zvonand)).
* keeper-fix: fix race in accessing logs while snapshot is being installed. [#40627](https://github.com/ClickHouse/ClickHouse/pull/40627) ([Antonio Andelic](https://github.com/antonio2368)).
* Fix short circuit execution of toFixedString function. Solves (partially) [#40622](https://github.com/ClickHouse/ClickHouse/issues/40622). [#40628](https://github.com/ClickHouse/ClickHouse/pull/40628) ([Kruglov Pavel](https://github.com/Avogar)).
* - Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)).
* Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)).
* Fix stack overflow in recursive `Buffer` tables. This closes [#40637](https://github.com/ClickHouse/ClickHouse/issues/40637). [#40643](https://github.com/ClickHouse/ClickHouse/pull/40643) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* During insertion of a new query to the `ProcessList` allocations happen. If we reach the memory limit during these allocations we can not use `OvercommitTracker`, because `ProcessList::mutex` is already acquired. Fixes [#40611](https://github.com/ClickHouse/ClickHouse/issues/40611). [#40677](https://github.com/ClickHouse/ClickHouse/pull/40677) ([Dmitry Novik](https://github.com/novikd)).
* Fix LOGICAL_ERROR with max_read_buffer_size=0 during reading marks. [#40705](https://github.com/ClickHouse/ClickHouse/pull/40705) ([Azat Khuzhin](https://github.com/azat)).
@ -174,7 +174,7 @@
* In [#40595](https://github.com/ClickHouse/ClickHouse/issues/40595) it was reported that the `host_regexp` functionality was not working properly with a name to address resolution in `/etc/hosts`. It's fixed. [#40769](https://github.com/ClickHouse/ClickHouse/pull/40769) ([Arthur Passos](https://github.com/arthurpassos)).
* Fix incremental backups for Log family. [#40827](https://github.com/ClickHouse/ClickHouse/pull/40827) ([Vitaly Baranov](https://github.com/vitlibar)).
* Fix extremely rare bug which can lead to potential data loss in zero-copy replication. [#40844](https://github.com/ClickHouse/ClickHouse/pull/40844) ([alesapin](https://github.com/alesapin)).
* - Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)).
* Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)).
* Fix nested JSON Objects schema inference. [#40851](https://github.com/ClickHouse/ClickHouse/pull/40851) ([Kruglov Pavel](https://github.com/Avogar)).
* Fix 3-digit prefix directory for filesystem cache files not being deleted if empty. Closes [#40797](https://github.com/ClickHouse/ClickHouse/issues/40797). [#40867](https://github.com/ClickHouse/ClickHouse/pull/40867) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Fix uncaught DNS_ERROR on failed connection to replicas. [#40881](https://github.com/ClickHouse/ClickHouse/pull/40881) ([Robert Coelho](https://github.com/coelho)).

View File

@ -1,75 +0,0 @@
#pragma once
#include <map>
#include <tuple>
#include <mutex>
#include "FnTraits.h"
/**
* Caching proxy for a functor that decays to a pointer-to-function.
* Saves pairs (func args, func result on args).
* Cache size is unlimited. Cache items are evicted only on manual drop.
* Invocation/update is O(log(saved cache values)).
*
* See Common/tests/cached_fn.cpp for examples.
*/
template <auto * Func>
struct CachedFn
{
private:
using Traits = FnTraits<decltype(Func)>;
using DecayedArgs = TypeListMap<std::decay_t, typename Traits::Args>;
using Key = TypeListChangeRoot<std::tuple, DecayedArgs>;
using Result = typename Traits::Ret;
std::map<Key, Result> cache; // Can't use hashmap as tuples are unhashable by default
mutable std::mutex mutex;
public:
template <class ...Args>
Result operator()(Args && ...args)
{
Key key{std::forward<Args>(args)...};
{
std::lock_guard lock(mutex);
if (auto it = cache.find(key); it != cache.end())
return it->second;
}
Result res = std::apply(Func, key);
{
std::lock_guard lock(mutex);
cache.emplace(std::move(key), res);
}
return res;
}
template <class ...Args>
void update(Args && ...args)
{
Key key{std::forward<Args>(args)...};
Result res = std::apply(Func, key);
{
std::lock_guard lock(mutex);
// TODO Can't use emplace(std::move(key), ..), causes test_host_ip_change errors.
cache[key] = std::move(res);
}
}
size_t size() const
{
std::lock_guard lock(mutex);
return cache.size();
}
void drop()
{
std::lock_guard lock(mutex);
cache.clear();
}
};

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 9fec8e11dbb6a352e1cfba8cc9e23ebd7fb77310
Subproject commit 76746b35d0e254eaaba71dc3b79e46cba8cbb144

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="22.8.5.29"
ARG VERSION="22.9.2.7"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -21,7 +21,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="22.8.5.29"
ARG VERSION="22.9.2.7"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -243,7 +243,7 @@ export USE_S3_STORAGE_FOR_MERGE_TREE=1
configure
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<disk>s3</disk>|<disk>s3</disk><disk>default</disk>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.9.2.7-stable (362e2cefcef) FIXME as compared to v22.9.1.2603-stable (3030d4c7ff0)
#### Improvement
* Backported in [#41709](https://github.com/ClickHouse/ClickHouse/issues/41709): Check file path for path traversal attacks in errors logger for input formats. [#41694](https://github.com/ClickHouse/ClickHouse/pull/41694) ([Kruglov Pavel](https://github.com/Avogar)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#41696](https://github.com/ClickHouse/ClickHouse/issues/41696): Fixes issue when docker run will fail if "https_port" is not present in config. [#41693](https://github.com/ClickHouse/ClickHouse/pull/41693) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Fix typos in JSON formats after [#40910](https://github.com/ClickHouse/ClickHouse/issues/40910) [#41614](https://github.com/ClickHouse/ClickHouse/pull/41614) ([Kruglov Pavel](https://github.com/Avogar)).

View File

@ -131,7 +131,7 @@ Example of configuration for versions later or equal to 22.8:
<type>cache</type>
<disk>s3</disk>
<path>/s3_cache/</path>
<max_size>10000000</max_size>
<max_size>10Gi</max_size>
</cache>
</disks>
<policies>
@ -155,7 +155,7 @@ Example of configuration for versions earlier than 22.8:
<endpoint>...</endpoint>
... s3 configuration ...
<data_cache_enabled>1</data_cache_enabled>
<data_cache_max_size>10000000</data_cache_max_size>
<data_cache_max_size>10737418240</data_cache_max_size>
</s3>
</disks>
<policies>
@ -172,7 +172,7 @@ Cache **configuration settings**:
- `path` - path to the directory with cache. Default: None, this setting is obligatory.
- `max_size` - maximum size of the cache in bytes. When the limit is reached, cache files are evicted according to the cache eviction policy. Default: None, this setting is obligatory.
- `max_size` - maximum size of the cache in bytes or in readable format, e.g. `ki, Mi, Gi, etc`, example `10Gi` (such format works starting from `22.10` version). When the limit is reached, cache files are evicted according to the cache eviction policy. Default: None, this setting is obligatory.
- `cache_on_write_operations` - allow to turn on `write-through` cache (caching data on any write operations: `INSERT` queries, background merges). Default: `false`. The `write-through` cache can be disabled per query using setting `enable_filesystem_cache_on_write_operations` (data is cached only if both cache config settings and corresponding query setting are enabled).
@ -182,7 +182,7 @@ Cache **configuration settings**:
- `do_not_evict_index_and_mark_files` - do not evict small frequently used files according to cache policy. Default: `false`. This setting was added in version 22.8. If you used filesystem cache before this version, then it will not work on versions starting from 22.8 if this setting is set to `true`. If you want to use this setting, clear old cache created before version 22.8 before upgrading.
- `max_file_segment_size` - a maximum size of a single cache file. Default: `104857600` (100 Mb).
- `max_file_segment_size` - a maximum size of a single cache file in bytes or in readable format (`ki, Mi, Gi, etc`, example `10Gi`). Default: `104857600` (`100Mi`).
- `max_elements` - a limit for a number of cache files. Default: `1048576`.

View File

@ -179,6 +179,9 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
{
disconnect();
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
throw NetException(e.displayText() + " (" + getDescription() + ")", ErrorCodes::NETWORK_ERROR);
}
@ -186,6 +189,9 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
{
disconnect();
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
/// This exception can only be thrown from socket->connect(), so add information about connection timeout.
const auto & connection_timeout = static_cast<bool>(secure) ? timeouts.secure_connection_timeout : timeouts.connection_timeout;

View File

@ -141,7 +141,7 @@ static void testIndex()
size_t index_rows = rng() % MAX_ROWS + 1;
test_case(rows, index_rows, 0);
test_case(rows, index_rows, 0.5 * index_rows);
test_case(rows, index_rows, static_cast<size_t>(0.5 * index_rows));
}
}
catch (const Exception & e)

View File

@ -1,7 +1,8 @@
#include "DNSResolver.h"
#include <base/CachedFn.h>
#include <Common/CacheBase.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/thread_local_rng.h>
#include <Core/Names.h>
#include <base/types.h>
#include <Poco/Net/IPAddress.h>
@ -12,6 +13,7 @@
#include <atomic>
#include <optional>
#include <string_view>
#include <unordered_set>
#include "DNSPTRResolverProvider.h"
namespace ProfileEvents
@ -41,9 +43,11 @@ namespace ErrorCodes
extern const int DNS_ERROR;
}
namespace
{
/// Slightly altered implementation from https://github.com/pocoproject/poco/blob/poco-1.6.1/Net/src/SocketAddress.cpp#L86
static void splitHostAndPort(const std::string & host_and_port, std::string & out_host, UInt16 & out_port)
void splitHostAndPort(const std::string & host_and_port, std::string & out_host, UInt16 & out_port)
{
String port_str;
out_host.clear();
@ -84,7 +88,7 @@ static void splitHostAndPort(const std::string & host_and_port, std::string & ou
throw Exception("Port must be numeric", ErrorCodes::BAD_ARGUMENTS);
}
static DNSResolver::IPAddresses hostByName(const std::string & host)
DNSResolver::IPAddresses hostByName(const std::string & host)
{
/// Do not resolve IPv6 (or IPv4) if no local IPv6 (or IPv4) addresses are configured.
/// It should not affect client address checking, since client cannot connect from IPv6 address
@ -112,7 +116,7 @@ static DNSResolver::IPAddresses hostByName(const std::string & host)
return addresses;
}
static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
{
Poco::Net::IPAddress ip;
@ -136,7 +140,13 @@ static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
return addresses;
}
static std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress & address)
DNSResolver::IPAddresses resolveIPAddressWithCache(CacheBase<std::string, DNSResolver::IPAddresses> & cache, const std::string & host)
{
auto [result, _ ] = cache.getOrSet(host, [&host]() { return std::make_shared<DNSResolver::IPAddresses>(resolveIPAddressImpl(host)); });
return *result;
}
std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress & address)
{
auto ptr_resolver = DB::DNSPTRResolverProvider::get();
@ -149,13 +159,27 @@ static std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress
}
}
std::unordered_set<String> reverseResolveWithCache(
CacheBase<Poco::Net::IPAddress, std::unordered_set<std::string>> & cache, const Poco::Net::IPAddress & address)
{
auto [result, _ ] = cache.getOrSet(address, [&address]() { return std::make_shared<std::unordered_set<String>>(reverseResolveImpl(address)); });
return *result;
}
Poco::Net::IPAddress pickAddress(const DNSResolver::IPAddresses & addresses)
{
return addresses.front();
}
}
struct DNSResolver::Impl
{
using HostWithConsecutiveFailures = std::unordered_map<String, UInt32>;
using AddressWithConsecutiveFailures = std::unordered_map<Poco::Net::IPAddress, UInt32>;
CachedFn<&resolveIPAddressImpl> cache_host;
CachedFn<&reverseResolveImpl> cache_address;
CacheBase<std::string, DNSResolver::IPAddresses> cache_host{100};
CacheBase<Poco::Net::IPAddress, std::unordered_set<std::string>> cache_address{100};
std::mutex drop_mutex;
std::mutex update_mutex;
@ -180,7 +204,7 @@ DNSResolver::DNSResolver() : impl(std::make_unique<DNSResolver::Impl>()), log(&P
Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host)
{
return resolveHostAll(host).front();
return pickAddress(resolveHostAll(host));
}
DNSResolver::IPAddresses DNSResolver::resolveHostAll(const std::string & host)
@ -189,7 +213,7 @@ DNSResolver::IPAddresses DNSResolver::resolveHostAll(const std::string & host)
return resolveIPAddressImpl(host);
addToNewHosts(host);
return impl->cache_host(host);
return resolveIPAddressWithCache(impl->cache_host, host);
}
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_and_port)
@ -202,7 +226,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_an
splitHostAndPort(host_and_port, host, port);
addToNewHosts(host);
return Poco::Net::SocketAddress(impl->cache_host(host).front(), port);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
}
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port)
@ -211,7 +235,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, U
return Poco::Net::SocketAddress(host, port);
addToNewHosts(host);
return Poco::Net::SocketAddress(impl->cache_host(host).front(), port);
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
}
std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std::string & host, UInt16 port)
@ -224,7 +248,7 @@ std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std:
if (!impl->disable_cache)
addToNewHosts(host);
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : impl->cache_host(host);
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : resolveIPAddressWithCache(impl->cache_host, host);
auto ips_end = std::unique(ips.begin(), ips.end());
addresses.reserve(ips_end - ips.begin());
@ -240,13 +264,13 @@ std::unordered_set<String> DNSResolver::reverseResolve(const Poco::Net::IPAddres
return reverseResolveImpl(address);
addToNewAddresses(address);
return impl->cache_address(address);
return reverseResolveWithCache(impl->cache_address, address);
}
void DNSResolver::dropCache()
{
impl->cache_host.drop();
impl->cache_address.drop();
impl->cache_host.reset();
impl->cache_address.reset();
std::scoped_lock lock(impl->update_mutex, impl->drop_mutex);
@ -257,6 +281,11 @@ void DNSResolver::dropCache()
impl->host_name.reset();
}
void DNSResolver::removeHostFromCache(const std::string & host)
{
impl->cache_host.remove(host);
}
void DNSResolver::setDisableCacheFlag(bool is_disabled)
{
impl->disable_cache = is_disabled;
@ -378,17 +407,20 @@ bool DNSResolver::updateCache(UInt32 max_consecutive_failures)
bool DNSResolver::updateHost(const String & host)
{
/// Usage of updateHost implies that host is already in cache and there is no extra computations
auto old_value = impl->cache_host(host);
impl->cache_host.update(host);
return old_value != impl->cache_host(host);
const auto old_value = resolveIPAddressWithCache(impl->cache_host, host);
auto new_value = resolveIPAddressImpl(host);
const bool result = old_value != new_value;
impl->cache_host.set(host, std::make_shared<DNSResolver::IPAddresses>(std::move(new_value)));
return result;
}
bool DNSResolver::updateAddress(const Poco::Net::IPAddress & address)
{
auto old_value = impl->cache_address(address);
impl->cache_address.update(address);
return old_value == impl->cache_address(address);
const auto old_value = reverseResolveWithCache(impl->cache_address, address);
auto new_value = reverseResolveImpl(address);
const bool result = old_value != new_value;
impl->cache_address.set(address, std::make_shared<std::unordered_set<String>>(std::move(new_value)));
return result;
}
void DNSResolver::addToNewHosts(const String & host)

View File

@ -18,6 +18,7 @@ class DNSResolver : private boost::noncopyable
{
public:
using IPAddresses = std::vector<Poco::Net::IPAddress>;
using IPAddressesPtr = std::shared_ptr<IPAddresses>;
static DNSResolver & instance();
@ -48,6 +49,9 @@ public:
/// Drops all caches
void dropCache();
/// Removes an entry from cache or does nothing
void removeHostFromCache(const std::string & host);
/// Updates all known hosts in cache.
/// Returns true if IP of any host has been changed or an element was dropped (too many failures)
bool updateCache(UInt32 max_consecutive_failures);

View File

@ -0,0 +1,48 @@
#include <Common/EventNotifier.h>
#include <Common/Exception.h>
#include <boost/functional/hash.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::unique_ptr<EventNotifier> EventNotifier::event_notifier;
EventNotifier & EventNotifier::init()
{
if (event_notifier)
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is initialized twice. This is a bug.");
event_notifier = std::make_unique<EventNotifier>();
return *event_notifier;
}
EventNotifier & EventNotifier::instance()
{
if (!event_notifier)
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is not initialized. This is a bug.");
return *event_notifier;
}
void EventNotifier::shutdown()
{
if (event_notifier)
event_notifier.reset();
}
size_t EventNotifier::calculateIdentifier(size_t a, size_t b)
{
size_t result = 0;
boost::hash_combine(result, a);
boost::hash_combine(result, b);
return result;
}
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <vector>
#include <mutex>
#include <functional>
#include <set>
#include <map>
#include <memory>
#include <utility>
#include <iostream>
#include <base/types.h>
#include <Common/HashTable/Hash.h>
namespace DB
{
class EventNotifier
{
public:
struct Handler
{
Handler(
EventNotifier & parent_,
size_t event_id_,
size_t callback_id_)
: parent(parent_)
, event_id(event_id_)
, callback_id(callback_id_)
{}
~Handler()
{
std::lock_guard lock(parent.mutex);
parent.callback_table[event_id].erase(callback_id);
parent.storage.erase(callback_id);
}
private:
EventNotifier & parent;
size_t event_id;
size_t callback_id;
};
using HandlerPtr = std::shared_ptr<Handler>;
static EventNotifier & init();
static EventNotifier & instance();
static void shutdown();
template <typename EventType, typename Callback>
[[ nodiscard ]] HandlerPtr subscribe(EventType event, Callback && callback)
{
std::lock_guard lock(mutex);
auto event_id = DefaultHash64(event);
auto callback_id = calculateIdentifier(event_id, ++counter);
callback_table[event_id].insert(callback_id);
storage[callback_id] = std::forward<Callback>(callback);
return std::make_shared<Handler>(*this, event_id, callback_id);
}
template <typename EventType>
void notify(EventType event)
{
std::lock_guard lock(mutex);
for (const auto & identifier : callback_table[DefaultHash64(event)])
storage[identifier]();
}
private:
// To move boost include for .h file
static size_t calculateIdentifier(size_t a, size_t b);
using CallbackType = std::function<void()>;
using CallbackStorage = std::map<size_t, CallbackType>;
using EventToCallbacks = std::map<size_t, std::set<size_t>>;
std::mutex mutex;
EventToCallbacks callback_table;
CallbackStorage storage;
size_t counter{0};
static std::unique_ptr<EventNotifier> event_notifier;
};
}

View File

@ -4,14 +4,15 @@
#include <Common/Elf.h>
#include <Common/SymbolIndex.h>
#include <Common/MemorySanitizer.h>
#include <base/CachedFn.h>
#include <base/demangle.h>
#include <atomic>
#include <cstring>
#include <filesystem>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <map>
#include <Common/config.h>
@ -462,20 +463,36 @@ std::string StackTrace::toString(void ** frame_pointers_, size_t offset, size_t
return toStringStatic(frame_pointers_copy, offset, size);
}
static CachedFn<&toStringImpl> & cacheInstance()
using StackTraceRepresentation = std::tuple<StackTrace::FramePointers, size_t, size_t>;
using StackTraceCache = std::map<StackTraceRepresentation, std::string>;
static StackTraceCache & cacheInstance()
{
static CachedFn<&toStringImpl> cache;
static StackTraceCache cache;
return cache;
}
static std::mutex stacktrace_cache_mutex;
std::string StackTrace::toStringStatic(const StackTrace::FramePointers & frame_pointers, size_t offset, size_t size)
{
/// Calculation of stack trace text is extremely slow.
/// We use simple cache because otherwise the server could be overloaded by trash queries.
return cacheInstance()(frame_pointers, offset, size);
/// Note that this cache can grow unconditionally, but practically it should be small.
std::lock_guard lock{stacktrace_cache_mutex};
StackTraceRepresentation key{frame_pointers, offset, size};
auto & cache = cacheInstance();
if (cache.contains(key))
return cache[key];
auto result = toStringImpl(frame_pointers, offset, size);
cache[key] = result;
return result;
}
void StackTrace::dropCache()
{
cacheInstance().drop();
std::lock_guard lock{stacktrace_cache_mutex};
cacheInstance().clear();
}

View File

@ -139,12 +139,14 @@ void ZooKeeper::init(ZooKeeperArgs args_)
}
}
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
zk_log = std::move(zk_log_);
init(args_);
}
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{

View File

@ -1,15 +1,16 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <Common/EventNotifier.h>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/logger_useful.h>
#include <base/getThreadId.h>
#include <Common/config.h>
@ -874,7 +875,11 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
/// No new requests will appear in queue after finish()
bool was_already_finished = requests_queue.finish();
if (!was_already_finished)
{
active_session_metric_increment.destroy();
/// Notify all subscribers (ReplicatedMergeTree tables) about expired session
EventNotifier::instance().notify(Error::ZSESSIONEXPIRED);
}
};
try

View File

@ -1,54 +0,0 @@
#include <gtest/gtest.h>
#include <thread>
#include <chrono>
#include <base/CachedFn.h>
using namespace std::chrono_literals;
constexpr int add(int x, int y)
{
return x + y;
}
int longFunction(int x, int y)
{
std::this_thread::sleep_for(1s);
return x + y;
}
auto f = [](int x, int y) { return x - y; };
TEST(CachedFn, Basic)
{
CachedFn<&add> fn;
const int res = fn(1, 2);
EXPECT_EQ(fn(1, 2), res);
/// In GCC, lambda can't be placed in TEST, producing "<labmda> has no linkage".
/// Assuming http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4268.html,
/// this is a GCC bug.
CachedFn<+f> fn2;
const int res2 = fn2(1, 2);
EXPECT_EQ(fn2(1, 2), res2);
}
TEST(CachedFn, CachingResults)
{
CachedFn<&longFunction> fn;
for (int x = 0; x < 2; ++x)
{
for (int y = 0; y < 2; ++y)
{
const int res = fn(x, y);
const time_t start = time(nullptr);
for (int count = 0; count < 1000; ++count)
EXPECT_EQ(fn(x, y), res);
EXPECT_LT(time(nullptr) - start, 10);
}
}
}

View File

@ -0,0 +1,53 @@
#include <gtest/gtest.h>
#include <Common/EventNotifier.h>
#include <Common/ZooKeeper/IKeeper.h>
TEST(EventNotifier, SimpleTest)
{
using namespace DB;
size_t result = 1;
EventNotifier::init();
auto handler3 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 3; });
{
auto handler5 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 5; });
}
auto handler7 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 7; });
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 21);
result = 1;
handler3.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 7);
auto handler11 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 11; });
result = 1;
handler7.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 11);
EventNotifier::HandlerPtr handler13;
{
handler13 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 13; });
}
result = 1;
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 143);
result = 1;
handler11.reset();
handler13.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 1);
EventNotifier::shutdown();
}

View File

@ -245,8 +245,8 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
credentials.setPassword(named_collection->configuration.password);
header_entries.reserve(named_collection->configuration.headers.size());
for (const auto & header : named_collection->configuration.headers)
header_entries.emplace_back(std::make_tuple(header.first, header.second.get<String>()));
for (const auto & [key, value] : named_collection->configuration.headers)
header_entries.emplace_back(std::make_tuple(key, value));
}
else
{

View File

@ -240,7 +240,6 @@ CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & file_segment)
{
auto download_state = file_segment->state();
LOG_TEST(log, "getReadBufferForFileSegment: {}", file_segment->getInfoForLog());
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
{
@ -251,7 +250,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
}
else
{
LOG_DEBUG(log, "Bypassing cache because `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` option is used");
LOG_TEST(log, "Bypassing cache because `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` option is used");
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(*file_segment, read_type);
}
@ -263,7 +262,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
{
case FileSegment::State::SKIP_CACHE:
{
LOG_DEBUG(log, "Bypassing cache because file segment state is `SKIP_CACHE`");
LOG_TRACE(log, "Bypassing cache because file segment state is `SKIP_CACHE`");
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(*file_segment, read_type);
}
@ -358,7 +357,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
}
else
{
LOG_DEBUG(
LOG_TRACE(
log,
"Bypassing cache because file segment state is `PARTIALLY_DOWNLOADED_NO_CONTINUATION` and downloaded part already used");
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
@ -658,7 +657,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
implementation_buffer->setReadUntilPosition(file_segment->range().right + 1); /// [..., range.right]
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
LOG_TEST(
LOG_TRACE(
log,
"Predownload failed because of space limit. "
"Will read from remote filesystem starting from offset: {}",
@ -786,10 +785,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
assertCorrectness();
if (file_offset_of_buffer_end == read_until_position)
{
LOG_TEST(log, "Read finished on offset {}", file_offset_of_buffer_end);
return false;
}
if (!initialized)
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
@ -813,10 +809,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{
bool need_complete_file_segment = file_segment->isDownloader();
if (need_complete_file_segment)
{
LOG_TEST(log, "Resetting downloader {} from scope exit", file_segment->getDownloader());
file_segment->completePartAndResetDownloader();
}
}
chassert(!file_segment->isDownloader());
@ -956,12 +949,12 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
else
{
chassert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TEST(log, "Bypassing cache because writeCache method failed");
LOG_TRACE(log, "Bypassing cache because writeCache method failed");
}
}
else
{
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
LOG_TRACE(log, "No space left in cache, will continue without cache download");
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}

View File

@ -2,6 +2,7 @@
#include <Disks/IDisk.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <IO/WriteHelpers.h>
@ -53,7 +54,31 @@ const std::string & MetadataStorageFromStaticFilesWebServer::getPath() const
bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const
{
return object_storage.files.contains(path);
fs::path fs_path(path);
if (fs_path.has_extension())
fs_path = fs_path.parent_path();
initializeIfNeeded(fs_path, false);
if (object_storage.files.empty())
return false;
if (object_storage.files.contains(path))
return true;
/// `object_storage.files` contains files + directories only inside `metadata_path / uuid_3_digit / uuid /`
/// (specific table files only), but we need to be able to also tell if `exists(<metadata_path>)`, for example.
auto it = std::lower_bound(
object_storage.files.begin(),
object_storage.files.end(),
path,
[](const auto & file, const std::string & path_) { return file.first < path_; }
);
if (startsWith(it->first, path)
|| (it != object_storage.files.begin() && startsWith(std::prev(it)->first, path)))
return true;
return false;
}
void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const
@ -98,7 +123,10 @@ uint64_t MetadataStorageFromStaticFilesWebServer::getFileSize(const String & pat
StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const
{
assertExists(path);
return {StoredObject::create(object_storage, path, object_storage.files.at(path).size, true)};
auto fs_path = fs::path(object_storage.url) / path;
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.substr(object_storage.url.size());
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, true)};
}
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const
@ -112,7 +140,7 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
return result;
}
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path) const
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error) const
{
if (object_storage.files.find(path) == object_storage.files.end())
{
@ -123,7 +151,7 @@ bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::stri
catch (...)
{
const auto message = getCurrentExceptionMessage(false);
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
bool can_throw = throw_on_error.has_value() ? *throw_on_error : CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
if (can_throw)
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
@ -140,13 +168,17 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
std::vector<fs::path> dir_file_paths;
if (!initializeIfNeeded(path))
{
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
}
assertExists(path);
for (const auto & [file_path, _] : object_storage.files)
if (parentPath(file_path) == path)
{
if (fs::path(parentPath(file_path)) / "" == fs::path(path) / "")
dir_file_paths.emplace_back(file_path);
}
LOG_TRACE(object_storage.log, "Iterate directory {} with {} files", path, dir_file_paths.size());
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));

View File

@ -19,7 +19,7 @@ private:
void assertExists(const std::string & path) const;
bool initializeIfNeeded(const std::string & path) const;
bool initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error = std::nullopt) const;
public:
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);

View File

@ -30,7 +30,6 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int FILE_DOESNT_EXIST;
extern const int NETWORK_ERROR;
}
@ -153,20 +152,6 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
std::optional<size_t>,
std::optional<size_t>) const
{
const auto & path = object.absolute_path;
LOG_TRACE(log, "Read from path: {}", path);
auto iter = files.find(path);
if (iter == files.end())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File path {} does not exist", path);
auto fs_path = fs::path(url) / path;
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.string().substr(url.size());
StoredObjects objects;
objects.emplace_back(remote_path, iter->second.size);
auto read_buffer_creator =
[this, read_settings]
(const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
@ -179,7 +164,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
read_until_position);
};
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, read_settings);
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{

View File

@ -114,7 +114,7 @@ protected:
size_t size = 0;
};
using Files = std::unordered_map<String, FileData>; /// file path -> file data
using Files = std::map<String, FileData>; /// file path -> file data
mutable Files files;
String url;

View File

@ -39,6 +39,7 @@ StoragePolicy::StoragePolicy(
const String & config_prefix,
DiskSelectorPtr disks)
: name(std::move(name_))
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
{
Poco::Util::AbstractConfiguration::Keys keys;
String volumes_prefix = config_prefix + ".volumes";
@ -81,11 +82,15 @@ StoragePolicy::StoragePolicy(
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
buildVolumeIndices();
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
}
StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_)
: volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
: volumes(std::move(volumes_))
, name(std::move(name_))
, move_factor(move_factor_)
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
{
if (volumes.empty())
throw Exception("Storage policy " + backQuote(name) + " must contain at least one Volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
@ -94,6 +99,7 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
buildVolumeIndices();
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
}
@ -206,12 +212,16 @@ UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
ReservationPtr StoragePolicy::reserve(UInt64 bytes, size_t min_volume_index) const
{
LOG_TRACE(log, "Reserving bytes {} from volume index {}, total volumes {}", bytes, min_volume_index, volumes.size());
for (size_t i = min_volume_index; i < volumes.size(); ++i)
{
const auto & volume = volumes[i];
auto reservation = volume->reserve(bytes);
if (reservation)
{
LOG_TRACE(log, "Successfully reserved {} bytes on volume index {}", bytes, i);
return reservation;
}
}
return {};
}

View File

@ -104,6 +104,8 @@ private:
double move_factor = 0.1; /// by default move factor is 10%
void buildVolumeIndices();
Poco::Logger * log;
};

View File

@ -1,3 +1,4 @@
#include "Common/DNSResolver.h"
#include <Common/config.h>
#if USE_AWS_S3
@ -257,6 +258,9 @@ void PocoHTTPClient::makeRequestInternal(
if (!request_configuration.proxy_host.empty())
{
if (enable_s3_requests_logging)
LOG_TEST(log, "Due to reverse proxy host name ({}) won't be resolved on ClickHouse side", uri);
/// Reverse proxy can replace host header with resolved ip address instead of host name.
/// This can lead to request signature difference on S3 side.
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
@ -274,6 +278,8 @@ void PocoHTTPClient::makeRequestInternal(
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ true);
}
/// In case of error this address will be written to logs
request.SetResolvedRemoteHost(session->getResolvedAddress());
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);
@ -441,6 +447,10 @@ void PocoHTTPClient::makeRequestInternal(
response->SetClientErrorMessage(getCurrentExceptionMessage(false));
addMetric(request, S3MetricType::Errors);
/// Probably this is socket timeout or something more or less related to DNS
/// Let's just remove this host from DNS cache to be more safe
DNSResolver::instance().removeHostFromCache(Poco::URI(uri).getHost());
}
}

View File

@ -114,6 +114,7 @@ struct URI
bool is_virtual_hosted_style;
explicit URI(const Poco::URI & uri_);
explicit URI(const std::string & uri_) : URI(Poco::URI(uri_)) {}
static void validateBucket(const String & bucket, const Poco::URI & uri);
};

View File

@ -2,6 +2,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <IO/ReadHelpers.h>
namespace DB
{
@ -16,7 +17,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
if (!config.has(config_prefix + ".max_size"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected cache size (`max_size`) in configuration");
max_size = config.getUInt64(config_prefix + ".max_size", 0);
max_size = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".max_size"));
if (max_size == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected non-zero size for cache configuration");
@ -25,7 +26,10 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk Cache requires non-empty `path` field (cache base path) in config");
max_elements = config.getUInt64(config_prefix + ".max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS);
max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
if (config.has(config_prefix + ".max_file_segment_size"))
max_file_segment_size = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".max_file_segment_size"));
else
max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;
cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false);
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);

View File

@ -34,7 +34,7 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
LOG_TRACE(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
return std::make_shared<LRUFileCacheIterator>(this, iter);
}
@ -54,7 +54,7 @@ void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
LOG_DEBUG(log, "Removed all entries from LRU queue");
LOG_TRACE(log, "Removed all entries from LRU queue");
queue.clear();
cache_size = 0;
@ -88,7 +88,7 @@ void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guar
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, queue_iter->size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements);
LOG_DEBUG(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
LOG_TRACE(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
queue_iter = cache_priority->queue.erase(queue_iter);
}

View File

@ -8,6 +8,7 @@
#include <Poco/Util/Application.h>
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/EventNotifier.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
@ -510,6 +511,7 @@ void Context::initGlobal()
assert(!global_context_instance);
global_context_instance = shared_from_this();
DatabaseCatalog::init(shared_from_this());
EventNotifier::init();
}
SharedContextHolder Context::createShared()

View File

@ -24,6 +24,10 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
bool isPreliminary() const { return pre_distinct; }
UInt64 getLimitHint() const { return limit_hint; }
private:
void updateOutputStream() override;

View File

@ -54,16 +54,20 @@ size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan:
/// Update information about prefix sort description in SortingStep.
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// Reading in order from MergeTree table if DISTINCT columns match or form a prefix of MergeTree sorting key
size_t tryDistinctReadInOrder(QueryPlan::Node * node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 7> optimizations = {{
static const std::array<Optimization, 8> optimizations = {{
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan}
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryDistinctReadInOrder, "distinctReadInOrder", &QueryPlanOptimizationSettings::distinct_in_order},
}};
return optimizations;

View File

@ -11,6 +11,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
settings.optimize_plan = from.query_plan_enable_optimizations;
settings.max_optimizations_to_apply = from.query_plan_max_optimizations_to_apply;
settings.filter_push_down = from.query_plan_filter_push_down;
settings.distinct_in_order = from.optimize_distinct_in_order;
return settings;
}

View File

@ -21,6 +21,9 @@ struct QueryPlanOptimizationSettings
/// If filter push down optimization is enabled.
bool filter_push_down = true;
/// if distinct in order optimization is enabled
bool distinct_in_order = false;
static QueryPlanOptimizationSettings fromSettings(const Settings & from);
static QueryPlanOptimizationSettings fromContext(ContextPtr from);
};

View File

@ -0,0 +1,97 @@
#include <memory>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Common/typeid_cast.h>
namespace DB::QueryPlanOptimizations
{
size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
{
/// check if it is preliminary distinct node
DistinctStep * pre_distinct = nullptr;
if (auto * distinct = typeid_cast<DistinctStep *>(parent_node->step.get()); distinct)
{
if (distinct->isPreliminary())
pre_distinct = distinct;
}
if (!pre_distinct)
return 0;
/// walk through the plan
/// (1) check if nodes below preliminary distinct preserve sorting
/// (2) gather transforming steps to update their sorting properties later
std::vector<ITransformingStep *> steps2update;
QueryPlan::Node * node = parent_node;
while (!node->children.empty())
{
auto * step = dynamic_cast<ITransformingStep *>(node->step.get());
if (!step)
return 0;
const ITransformingStep::DataStreamTraits & traits = step->getDataStreamTraits();
if (!traits.preserves_sorting)
return 0;
steps2update.push_back(step);
node = node->children.front();
}
/// check if we read from MergeTree
auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(node->step.get());
if (!read_from_merge_tree)
return 0;
/// find non-const columns in DISTINCT
const ColumnsWithTypeAndName & distinct_columns = pre_distinct->getOutputStream().header.getColumnsWithTypeAndName();
std::set<std::string_view> non_const_columns;
for (const auto & column : distinct_columns)
{
if (!isColumnConst(*column.column))
non_const_columns.emplace(column.name);
}
const Names& sorting_key_columns = read_from_merge_tree->getStorageMetadata()->getSortingKeyColumns();
/// check if DISTINCT has the same columns as sorting key
size_t number_of_sorted_distinct_columns = 0;
for (const auto & column_name : sorting_key_columns)
{
if (non_const_columns.end() == non_const_columns.find(column_name))
break;
++number_of_sorted_distinct_columns;
}
/// apply optimization only when distinct columns match or form prefix of sorting key
/// todo: check if reading in order optimization would be beneficial when sorting key is prefix of columns in DISTINCT
if (number_of_sorted_distinct_columns != non_const_columns.size())
return 0;
/// check if another read in order optimization is already applied
/// apply optimization only if another read in order one uses less sorting columns
/// example: SELECT DISTINCT a, b FROM t ORDER BY a; -- sorting key: a, b
/// if read in order for ORDER BY is already applied, then output sort description will contain only column `a`
/// but we need columns `a, b`, applying read in order for distinct will still benefit `order by`
const DataStream & output_data_stream = read_from_merge_tree->getOutputStream();
const SortDescription & output_sort_desc = output_data_stream.sort_description;
if (output_data_stream.sort_scope != DataStream::SortScope::Chunk && number_of_sorted_distinct_columns <= output_sort_desc.size())
return 0;
/// update input order info in read_from_merge_tree step
const int direction = 0; /// for DISTINCT direction doesn't matter, ReadFromMergeTree will choose proper one
read_from_merge_tree->requestReadingInOrder(number_of_sorted_distinct_columns, direction, pre_distinct->getLimitHint());
/// update data stream's sorting properties for found transforms
const DataStream * input_stream = &read_from_merge_tree->getOutputStream();
while (!steps2update.empty())
{
steps2update.back()->updateInputStream(*input_stream);
input_stream = &steps2update.back()->getOutputStream();
steps2update.pop_back();
}
return 0;
}
}

View File

@ -91,8 +91,6 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
window->getWindowDescription().full_sort_description,
query_info.syntax_analyzer_result);
read_from_merge_tree->setQueryInfoOrderOptimizer(order_optimizer);
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
@ -103,7 +101,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
if (order_info)
{
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
sorting->convertToFinishSorting(order_info->sort_description_for_merging);
}

View File

@ -153,7 +153,6 @@ ReadFromMergeTree::ReadFromMergeTree(
}
output_stream->sort_description = std::move(sort_description);
}
}
@ -1019,28 +1018,38 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
}
void ReadFromMergeTree::setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> order_optimizer)
void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit)
{
if (query_info.projection)
{
query_info.projection->order_optimizer = order_optimizer;
}
else
{
query_info.order_optimizer = order_optimizer;
}
}
/// if dirction is not set, use current one
if (!direction)
direction = getSortDirection();
void ReadFromMergeTree::setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info)
{
auto order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
if (query_info.projection)
{
query_info.projection->input_order_info = order_info;
}
else
{
query_info.input_order_info = order_info;
/// update sort info for output stream
SortDescription sort_description;
const Names & sorting_key_columns = storage_snapshot->getMetadataForQuery()->getSortingKeyColumns();
const Block & header = output_stream->header;
const int sort_direction = getSortDirection();
for (const auto & column_name : sorting_key_columns)
{
if (std::find_if(header.begin(), header.end(), [&](ColumnWithTypeAndName const & col) { return col.name == column_name; })
== header.end())
break;
sort_description.emplace_back(column_name, sort_direction);
}
if (sort_description.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Sort description can't be empty when reading in order");
const size_t used_prefix_of_sorting_key_size = order_info->used_prefix_of_sorting_key_size;
if (sort_description.size() > used_prefix_of_sorting_key_size)
sort_description.resize(used_prefix_of_sorting_key_size);
output_stream->sort_description = std::move(sort_description);
output_stream->sort_scope = DataStream::SortScope::Stream;
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const

View File

@ -151,8 +151,7 @@ public:
const SelectQueryInfo & getQueryInfo() const { return query_info; }
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
void setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> read_in_order_optimizer);
void setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info);
void requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
private:
int getSortDirection() const

View File

@ -307,7 +307,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
{
const auto header_prefix = headers_prefix + header;
configuration.headers.emplace_back(
std::make_pair(headers_config->getString(header_prefix + ".name"), headers_config->getString(header_prefix + ".value")));
headers_config->getString(header_prefix + ".name"),
headers_config->getString(header_prefix + ".value"));
}
}
@ -446,7 +447,9 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
for (const auto & header : header_keys)
{
const auto header_prefix = config_prefix + ".headers." + header;
configuration.headers.emplace_back(std::make_pair(config.getString(header_prefix + ".name"), config.getString(header_prefix + ".value")));
configuration.headers.emplace_back(
config.getString(header_prefix + ".name"),
config.getString(header_prefix + ".value"));
}
}
else

View File

@ -3,6 +3,7 @@
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/HeaderCollection.h>
namespace DB
@ -108,7 +109,7 @@ struct URLBasedDataSourceConfiguration
String user;
String password;
std::vector<std::pair<String, Field>> headers;
HeaderCollection headers;
String http_method;
void set(const URLBasedDataSourceConfiguration & conf);

View File

@ -0,0 +1,18 @@
#pragma once
#include <string>
namespace DB
{
struct HttpHeader
{
std::string name;
std::string value;
HttpHeader(const std::string & name_, const std::string & value_) : name(name_), value(value_) {}
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
};
using HeaderCollection = std::vector<HttpHeader>;
}

View File

@ -100,8 +100,10 @@ struct ReplicatedFetchReadCallback
}
Service::Service(StorageReplicatedMergeTree & data_) :
data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
Service::Service(StorageReplicatedMergeTree & data_)
: data(data_)
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Replicated PartsService)"))
{}
std::string Service::getId(const std::string & node_id) const
{
@ -444,6 +446,11 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
}
Fetcher::Fetcher(StorageReplicatedMergeTree & data_)
: data(data_)
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Fetcher)"))
{}
MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
@ -494,6 +501,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
if (disk)
{
LOG_TRACE(log, "Will fetch to disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
UInt64 revision = disk->getRevision();
if (revision)
uri.addQueryParameter("disk_revision", toString(revision));
@ -504,13 +512,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
{
if (!disk)
{
LOG_TRACE(log, "Trying to fetch with zero-copy replication, but disk is not provided, will try to select");
Disks disks = data.getDisks();
for (const auto & data_disk : disks)
{
LOG_TRACE(log, "Checking disk {} with type {}", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
if (data_disk->supportZeroCopyReplication())
{
LOG_TRACE(log, "Disk {} (with type {}) supports zero-copy replication", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
capability.push_back(toString(data_disk->getDataSourceDescription().type));
}
}
}
else if (disk->supportZeroCopyReplication())
{
LOG_TRACE(log, "Trying to fetch with zero copy replication, provided disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
capability.push_back(toString(disk->getDataSourceDescription().type));
}
}
@ -562,29 +578,47 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
if (!disk)
{
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using storage balanced reservation");
reservation
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
{
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using TTL rules");
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
}
}
}
else if (!disk)
{
LOG_TRACE(log, "Making balanced reservation");
reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr);
if (!reservation)
{
LOG_TRACE(log, "Making simple reservation");
reservation = data.reserveSpace(sum_files_size);
}
}
}
else if (!disk)
{
LOG_TRACE(log, "Making reservation on the largest disk");
/// We don't know real size of part because sender server version is too old
reservation = data.makeEmptyReservationOnLargestDisk();
}
if (!disk)
{
disk = reservation->getDisk();
LOG_INFO(log, "Disk for fetch is not provided, getting disk from reservation {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
}
else
{
LOG_INFO(log, "Disk for fetch is disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
}
UInt64 revision = parse<UInt64>(in->getResponseCookie("disk_revision", "0"));
if (revision)
@ -989,7 +1023,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
if (!disk->supportZeroCopyReplication() || !disk->checkUniqueId(part_id))
{
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName());
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));
}
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",

View File

@ -67,7 +67,7 @@ private:
class Fetcher final : private boost::noncopyable
{
public:
explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
explicit Fetcher(StorageReplicatedMergeTree & data_);
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchSelectedPart(

View File

@ -4909,12 +4909,14 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
LOG_TRACE(log, "Trying reserve {} bytes preffering TTL rules", expected_size);
ReservationPtr reservation;
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);
if (move_ttl_entry)
{
LOG_TRACE(log, "Got move TTL entry, will try to reserver destination for move");
SpacePtr destination_ptr = getDestinationForMoveTTL(*move_ttl_entry, is_insert);
if (!destination_ptr)
{
@ -4935,10 +4937,15 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
}
else
{
LOG_TRACE(log, "Reserving bytes on selected destination");
reservation = destination_ptr->reserve(expected_size);
if (reservation)
{
LOG_TRACE(log, "Reservation successful");
return reservation;
}
else
{
if (move_ttl_entry->destination_type == DataDestinationType::VOLUME)
LOG_WARNING(
log,
@ -4951,15 +4958,22 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
"Would like to reserve space on disk '{}' by TTL rule of table '{}' but there is not enough space",
move_ttl_entry->destination_name,
*std::atomic_load(&log_name));
}
}
}
// Prefer selected_disk
if (selected_disk)
{
LOG_DEBUG(log, "Disk for reservation provided: {} (with type {})", selected_disk->getName(), toString(selected_disk->getDataSourceDescription().type));
reservation = selected_disk->reserve(expected_size);
}
if (!reservation)
{
LOG_DEBUG(log, "No reservation, reserving using storage policy from min volume index {}", min_volume_index);
reservation = getStoragePolicy()->reserve(expected_size, min_volume_index);
}
return reservation;
}

View File

@ -1467,6 +1467,8 @@ bool MutateTask::execute()
}
case State::NEED_EXECUTE:
{
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
if (task->executeStep())
return true;

View File

@ -202,7 +202,7 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrderImpl(
const ContextPtr & context,
UInt64 limit) const
{
auto sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
const Names & sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
int read_direction = description.at(0).direction;
auto fixed_sorting_columns = getFixedSortingColumns(query, sorting_key_columns, context);

View File

@ -4185,6 +4185,11 @@ void StorageReplicatedMergeTree::startupImpl()
/// In this thread replica will be activated.
restarting_thread.start();
/// And this is just a callback
session_expired_callback_handler = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [this]()
{
restarting_thread.start();
});
/// Wait while restarting_thread finishing initialization.
/// NOTE It does not mean that replication is actually started after receiving this event.
@ -4228,6 +4233,8 @@ void StorageReplicatedMergeTree::shutdown()
if (shutdown_called.exchange(true))
return;
session_expired_callback_handler.reset();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();

View File

@ -29,6 +29,7 @@
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Throttler.h>
#include <Common/EventNotifier.h>
#include <base/defines.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h>
@ -453,6 +454,7 @@ private:
/// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread;
EventNotifier::HandlerPtr session_expired_callback_handler;
/// A thread that attaches the table using ZooKeeper
std::optional<ReplicatedMergeTreeAttachThread> attach_thread;

View File

@ -27,6 +27,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
@ -767,33 +768,28 @@ private:
StorageS3::StorageS3(
const S3::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
const String & format_name_,
const S3Settings::ReadWriteSettings & rw_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_,
bool distributed_processing_,
ASTPtr partition_by_)
: IStorage(table_id_)
, s3_configuration{uri_, access_key_id_, secret_access_key_, {}, {}, rw_settings_} /// Client and settings will be updated later
, keys({uri_.key})
, format_name(format_name_)
, compression_method(compression_method_)
, name(uri_.storage_name)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, keys({s3_configuration.uri.key})
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
, name(s3_configuration.uri.storage_name)
, distributed_processing(distributed_processing_)
, format_settings(format_settings_)
, partition_by(partition_by_)
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
, is_key_with_globs(s3_configuration.uri.key.find_first_of("*?{") != std::string::npos)
{
FormatFactory::instance().checkFormatName(format_name);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.uri.uri);
StorageInMemoryMetadata storage_metadata;
updateS3Configuration(context_, s3_configuration);
@ -1062,47 +1058,48 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
{
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
const auto & config_rw_settings = settings.rw_settings;
bool need_update_configuration = settings != S3Settings{};
if (need_update_configuration)
{
if (upd.rw_settings != settings.rw_settings)
upd.rw_settings = settings.rw_settings;
}
if (upd.rw_settings != config_rw_settings)
upd.rw_settings = settings.rw_settings;
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
if (upd.client && (!upd.access_key_id.empty() || settings.auth_settings == upd.auth_settings))
return;
Aws::Auth::AWSCredentials credentials(upd.access_key_id, upd.secret_access_key);
HeaderCollection headers;
if (upd.access_key_id.empty())
if (upd.client)
{
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key);
headers = settings.auth_settings.headers;
if (upd.static_configuration)
return;
if (settings.auth_settings == upd.auth_settings)
return;
}
upd.auth_settings.updateFrom(settings.auth_settings);
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings.region,
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
upd.auth_settings.region,
ctx->getRemoteHostFilter(),
ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false);
client_configuration.endpointOverride = upd.uri.endpoint;
client_configuration.maxConnections = upd.rw_settings.max_connections;
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
auto headers = upd.auth_settings.headers;
if (!upd.headers_from_ast.empty())
headers.insert(headers.end(), upd.headers_from_ast.begin(), upd.headers_from_ast.end());
upd.client = S3::ClientFactory::instance().create(
client_configuration,
upd.uri.is_virtual_hosted_style,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
settings.auth_settings.server_side_encryption_customer_key_base64,
upd.auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
settings.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
settings.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
upd.auth_settings = std::move(settings.auth_settings);
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
}
@ -1155,6 +1152,10 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto header_it = StorageURL::collectHeaders(engine_args, configuration, local_context);
if (header_it != engine_args.end())
engine_args.erase(header_it);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
@ -1184,19 +1185,23 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
}
ColumnsDescription StorageS3::getTableStructureFromData(
const String & format,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & compression_method,
const StorageS3Configuration & configuration,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos)
{
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) };
S3Configuration s3_configuration{
configuration.url,
configuration.auth_settings,
S3Settings::ReadWriteSettings(ctx->getSettingsRef()),
configuration.headers};
updateS3Configuration(ctx, s3_configuration);
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
return getTableStructureFromDataImpl(
configuration.format, s3_configuration, configuration.compression_method, distributed_processing,
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
}
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -1308,25 +1313,18 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
format_settings = getFormatSettings(args.getContext());
}
S3::URI s3_uri(Poco::URI(configuration.url));
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3>(
s3_uri,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
args.table_id,
configuration.format,
configuration.rw_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
configuration.compression_method,
/* distributed_processing_ */false,
partition_by);
},

View File

@ -149,18 +149,13 @@ class StorageS3 : public IStorage, WithContext
{
public:
StorageS3(
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
const String & format_name_,
const S3Settings::ReadWriteSettings & rw_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_ = "",
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
@ -189,11 +184,7 @@ public:
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static ColumnsDescription getTableStructureFromData(
const String & format,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & compression_method,
const StorageS3Configuration & configuration,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
@ -204,11 +195,28 @@ public:
struct S3Configuration
{
const S3::URI uri;
const String access_key_id;
const String secret_access_key;
std::shared_ptr<const Aws::S3::S3Client> client;
S3Settings::AuthSettings auth_settings;
S3Settings::ReadWriteSettings rw_settings;
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HeaderCollection headers_from_ast;
S3Configuration(
const String & url_,
const S3Settings::AuthSettings & auth_settings_,
const S3Settings::ReadWriteSettings & rw_settings_,
const HeaderCollection & headers_from_ast_)
: uri(S3::URI(url_))
, auth_settings(auth_settings_)
, rw_settings(rw_settings_)
, static_configuration(!auth_settings_.access_key_id.empty())
, headers_from_ast(headers_from_ast_) {}
};
static SchemaCache & getSchemaCache(const ContextPtr & ctx);

View File

@ -46,22 +46,17 @@
namespace DB
{
StorageS3Cluster::StorageS3Cluster(
const String & filename_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageS3ClusterConfiguration & configuration_,
const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
ContextPtr context_)
: IStorage(table_id_)
, s3_configuration{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, {}, {}, S3Settings::ReadWriteSettings(context_->getSettingsRef())}
, filename(filename_)
, cluster_name(cluster_name_)
, format_name(format_name_)
, compression_method(compression_method_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
StorageInMemoryMetadata storage_metadata;

View File

@ -21,16 +21,11 @@ class StorageS3Cluster : public IStorage
{
public:
StorageS3Cluster(
const String & filename_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageS3ClusterConfiguration & configuration_,
const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_);
ContextPtr context_);
std::string getName() const override { return "S3Cluster"; }

View File

@ -7,6 +7,7 @@
#include <vector>
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/HeaderCollection.h>
namespace Poco::Util
{
@ -15,15 +16,6 @@ class AbstractConfiguration;
namespace DB
{
struct HttpHeader
{
String name;
String value;
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
};
using HeaderCollection = std::vector<HttpHeader>;
struct Settings;
@ -50,6 +42,21 @@ struct S3Settings
&& use_environment_credentials == other.use_environment_credentials
&& use_insecure_imds_request == other.use_insecure_imds_request;
}
void updateFrom(const AuthSettings & from)
{
/// Update with check for emptyness only parameters which
/// can be passed not only from config, but via ast.
if (!from.access_key_id.empty())
access_key_id = from.access_key_id;
if (!from.secret_access_key.empty())
secret_access_key = from.secret_access_key;
headers = from.headers;
region = from.region;
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
}
};
struct ReadWriteSettings
@ -94,7 +101,6 @@ struct S3Settings
class StorageS3Settings
{
public:
StorageS3Settings() = default;
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings);
S3Settings getSettings(const String & endpoint) const;

View File

@ -1018,7 +1018,7 @@ ASTs::iterator StorageURL::collectHeaders(
if (arg_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
configuration.headers.emplace_back(arg_name, arg_value);
configuration.headers.emplace_back(arg_name, arg_value.safeGet<String>());
}
headers_it = arg_it;
@ -1096,10 +1096,9 @@ void registerStorageURL(StorageFactory & factory)
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
headers.emplace_back(header, value);
}
ASTPtr partition_by;

View File

@ -12,6 +12,7 @@
#include <Parsers/ASTLiteral.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Formats/FormatFactory.h>
#include "registerTableFunctions.h"
#include <filesystem>
@ -40,6 +41,10 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
auto header_it = StorageURL::collectHeaders(args, s3_configuration, context);
if (header_it != args.end())
args.erase(header_it);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
@ -135,15 +140,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration.compression_method,
false,
std::nullopt,
context);
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
@ -161,19 +158,14 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
columns = structure_hint;
StoragePtr storage = std::make_shared<StorageS3>(
s3_uri,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
StorageID(getDatabaseName(), table_name),
configuration.format,
configuration.rw_settings,
columns,
ConstraintsDescription{},
String{},
context,
/// No format_settings for table function S3
std::nullopt,
configuration.compression_method);
std::nullopt);
storage->startup();

View File

@ -82,19 +82,10 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const
{
context->checkAccess(getSourceAccessType());
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration.compression_method,
false,
std::nullopt,
context);
}
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return parseColumnsListFromString(configuration.structure, context);
}
@ -104,46 +95,38 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
StoragePtr storage;
ColumnsDescription columns;
if (configuration.structure != "auto")
{
columns = parseColumnsListFromString(configuration.structure, context);
}
else if (!structure_hint.empty())
{
columns = structure_hint;
}
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this filename won't contains globs
Poco::URI uri (configuration.url);
S3::URI s3_uri (uri);
storage = std::make_shared<StorageS3>(
s3_uri,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
StorageID(getDatabaseName(), table_name),
configuration.format,
configuration.rw_settings,
columns,
ConstraintsDescription{},
String{},
/* comment */String{},
context,
// No format_settings for S3Cluster
std::nullopt,
configuration.compression_method,
/* format_settings */std::nullopt, /// No format_settings for S3Cluster
/*distributed_processing=*/true);
}
else
{
storage = std::make_shared<StorageS3Cluster>(
configuration.url,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
StorageID(getDatabaseName(), table_name),
configuration.cluster_name, configuration.format,
columns,
ConstraintsDescription{},
context,
configuration.compression_method);
context);
}
storage->startup();

View File

@ -103,10 +103,9 @@ ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
headers.emplace_back(header, value);
}
return headers;
}

View File

@ -13,9 +13,7 @@
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
<main><disk>s3</disk></main>
</volumes>
</s3>
</policies>

View File

@ -48,8 +48,8 @@
<s3_cache>
<type>cache</type>
<disk>s3_disk</disk>
<path>s3_disk_cache/</path>
<max_size>22548578304</max_size>
<path>s3_cache/</path>
<max_size>2147483648</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
</s3_cache>
@ -57,8 +57,9 @@
<type>cache</type>
<disk>s3_disk_2</disk>
<path>s3_cache_2/</path>
<max_size>22548578304</max_size>
<max_size>2Gi</max_size>
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
<max_file_segment_size>100Mi</max_file_segment_size>
</s3_cache_2>
<s3_cache_3>
<type>cache</type>

View File

@ -25,19 +25,17 @@ def cluster():
global uuids
for i in range(3):
node1.query(
""" CREATE TABLE data{} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def';""".format(
i
)
f"CREATE TABLE data{i} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def', min_bytes_for_wide_part=1;"
)
node1.query(
"INSERT INTO data{} SELECT number FROM numbers(500000 * {})".format(
i, i + 1
for _ in range(10):
node1.query(
f"INSERT INTO data{i} SELECT number FROM numbers(500000 * {i+1})"
)
)
expected = node1.query("SELECT * FROM data{} ORDER BY id".format(i))
expected = node1.query(f"SELECT * FROM data{i} ORDER BY id")
metadata_path = node1.query(
"SELECT data_paths FROM system.tables WHERE name='data{}'".format(i)
f"SELECT data_paths FROM system.tables WHERE name='data{i}'"
)
metadata_path = metadata_path[
metadata_path.find("/") : metadata_path.rfind("/") + 1
@ -84,7 +82,7 @@ def test_usage(cluster, node_name):
result = node2.query("SELECT * FROM test{} settings max_threads=20".format(i))
result = node2.query("SELECT count() FROM test{}".format(i))
assert int(result) == 500000 * (i + 1)
assert int(result) == 5000000 * (i + 1)
result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
@ -123,7 +121,7 @@ def test_incorrect_usage(cluster):
)
result = node2.query("SELECT count() FROM test0")
assert int(result) == 500000
assert int(result) == 5000000
result = node2.query_and_get_error("ALTER TABLE test0 ADD COLUMN col1 Int32 first")
assert "Table is read-only" in result
@ -169,7 +167,7 @@ def test_cache(cluster, node_name):
assert int(result) > 0
result = node2.query("SELECT count() FROM test{}".format(i))
assert int(result) == 500000 * (i + 1)
assert int(result) == 5000000 * (i + 1)
result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)

View File

@ -3,6 +3,12 @@ import time
import os
import pytest
# FIXME Test is temporarily disabled due to flakyness
# https://github.com/ClickHouse/ClickHouse/issues/39700
pytestmark = pytest.mark.skip
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread

View File

@ -110,6 +110,10 @@ def started_cluster():
main_configs=["configs/defaultS3.xml"],
user_configs=["configs/s3_max_redirects.xml"],
)
cluster.add_instance(
"s3_non_default",
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -1689,3 +1693,22 @@ def test_schema_inference_cache(started_cluster):
test("s3")
test("url")
def test_ast_auth_headers(started_cluster):
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["s3_non_default"] # type: ClickHouseInstance
filename = "test.csv"
result = instance.query_and_get_error(
f"select count() from s3('http://resolver:8080/{bucket}/{filename}', 'CSV')"
)
assert "Forbidden Error" in result
assert "S3_ERROR" in result
result = instance.query(
f"select * from s3('http://resolver:8080/{bucket}/{filename}', 'CSV', headers(Authorization=`Bearer TOKEN`))"
)
assert result.strip() == "1\t2\t3"

View File

@ -4,15 +4,10 @@
<fill_query>INSERT INTO distinct_cardinality_high SELECT number % 1e6, number % 1e4, number % 1e2 FROM numbers_mt(1e8)</fill_query>
<query>SELECT DISTINCT high FROM distinct_cardinality_high FORMAT Null</query>
<query>SELECT DISTINCT high, low FROM distinct_cardinality_high FORMAT Null</query>
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high FORMAT Null</query>
<query>SELECT DISTINCT high, medium, low FROM distinct_cardinality_high FORMAT Null</query>
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high ORDER BY high, medium FORMAT Null</query>
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high ORDER BY high FORMAT Null</query>
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high ORDER BY medium FORMAT Null</query>
<query>SELECT DISTINCT high, low FROM distinct_cardinality_high ORDER BY low FORMAT Null</query>
<query>SELECT DISTINCT high, medium, low FROM distinct_cardinality_high ORDER BY low FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS distinct_cardinality_high</drop_query>
@ -22,14 +17,9 @@
<query>SELECT DISTINCT low FROM distinct_cardinality_low FORMAT Null</query>
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low FORMAT Null</query>
<query>SELECT DISTINCT low, high FROM distinct_cardinality_low FORMAT Null</query>
<query>SELECT DISTINCT low, medium, high FROM distinct_cardinality_low FORMAT Null</query>
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low ORDER BY low, medium FORMAT Null</query>
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low ORDER BY low FORMAT Null</query>
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low ORDER BY medium FORMAT Null</query>
<query>SELECT DISTINCT low, high FROM distinct_cardinality_low ORDER BY high FORMAT Null</query>
<query>SELECT DISTINCT low, medium, high FROM distinct_cardinality_low ORDER BY high FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS distinct_cardinality_low</drop_query>
</test>

View File

@ -15,6 +15,8 @@ INSERT INTO partslost_0 SELECT toString(number) AS x from system.numbers LIMIT 1
ALTER TABLE partslost_0 ADD INDEX idx x TYPE tokenbf_v1(285000, 3, 12345) GRANULARITY 3;
SET mutations_sync = 2;
ALTER TABLE partslost_0 MATERIALIZE INDEX idx;
-- In worst case doesn't check anything, but it's not flaky

View File

@ -104,3 +104,9 @@ select distinct a, b, x, y from (select a, b, 1 as x, 2 as y from distinct_in_or
0 3 1 2
0 4 1 2
-- check that distinct in order returns the same result as ordinary distinct
-- check that distinct in order WITH order by returns the same result as ordinary distinct
0
-- check that distinct in order WITHOUT order by returns the same result as ordinary distinct
0
-- check that distinct in order WITHOUT order by and WITH filter returns the same result as ordinary distinct
0

View File

@ -65,11 +65,32 @@ INSERT INTO distinct_cardinality_low SELECT number % 1e1, number % 1e2, number %
drop table if exists distinct_in_order sync;
drop table if exists ordinary_distinct sync;
select '-- check that distinct in order WITH order by returns the same result as ordinary distinct';
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
insert into distinct_in_order select distinct * from distinct_cardinality_low order by high settings optimize_distinct_in_order=1;
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
insert into ordinary_distinct select distinct * from distinct_cardinality_low order by high settings optimize_distinct_in_order=0;
select distinct * from distinct_in_order except select * from ordinary_distinct;
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
drop table if exists distinct_in_order sync;
drop table if exists ordinary_distinct sync;
select '-- check that distinct in order WITHOUT order by returns the same result as ordinary distinct';
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
insert into distinct_in_order select distinct * from distinct_cardinality_low settings optimize_distinct_in_order=1;
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
insert into ordinary_distinct select distinct * from distinct_cardinality_low settings optimize_distinct_in_order=0;
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
drop table if exists distinct_in_order;
drop table if exists ordinary_distinct;
select '-- check that distinct in order WITHOUT order by and WITH filter returns the same result as ordinary distinct';
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
insert into distinct_in_order select distinct * from distinct_cardinality_low where low > 0 settings optimize_distinct_in_order=1;
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
insert into ordinary_distinct select distinct * from distinct_cardinality_low where low > 0 settings optimize_distinct_in_order=0;
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
drop table if exists distinct_in_order;
drop table if exists ordinary_distinct;

View File

@ -15,6 +15,9 @@ DistinctSortedChunkTransform
-- distinct with primary key prefix and order by the same columns -> pre-distinct and final distinct optimization
DistinctSortedStreamTransform
DistinctSortedChunkTransform
-- distinct with primary key prefix and order by columns are prefix of distinct columns -> pre-distinct and final distinct optimization
DistinctSortedTransform
DistinctSortedChunkTransform
-- distinct with primary key prefix and order by column in distinct but non-primary key prefix -> pre-distinct and final distinct optimization
DistinctSortedTransform
DistinctSortedChunkTransform
@ -33,3 +36,48 @@ DistinctTransform
-- distinct with non-primary key prefix and order by _const_ column in distinct -> ordinary distinct
DistinctTransform
DistinctTransform
-- Check reading in order for distinct
-- disabled, distinct columns match sorting key
MergeTreeThread
-- enabled, distinct columns match sorting key
MergeTreeInOrder
-- enabled, distinct columns form prefix of sorting key
MergeTreeInOrder
-- enabled, distinct columns DON't form prefix of sorting key
MergeTreeThread
-- enabled, distinct columns contains constant columns, non-const columns form prefix of sorting key
MergeTreeInOrder
-- enabled, distinct columns contains constant columns, non-const columns match prefix of sorting key
MergeTreeInOrder
-- enabled, only part of distinct columns form prefix of sorting key
MergeTreeThread
-- enabled, check that sorting properties are propagated from ReadFromMergeTree till preliminary distinct
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
-- check that reading in order optimization for ORDER BY and DISTINCT applied correctly in the same query
-- disabled, check that sorting description for ReadFromMergeTree match ORDER BY columns
Sorting (Stream): a ASC
Sorting (Stream): a ASC
Sorting (Stream): a ASC
-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization i.e. it contains columns from DISTINCT clause
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization, but direction used from ORDER BY clause
Sorting (Stream): a DESC, b DESC
Sorting (Stream): a DESC, b DESC
Sorting (Stream): a DESC, b DESC
-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (1), - it contains columns from ORDER BY clause
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (2), - direction used from ORDER BY clause
Sorting (Stream): a DESC, b DESC
Sorting (Stream): a DESC, b DESC
Sorting (Stream): a DESC, b DESC
-- enabled, check that disabling other 'read in order' optimizations do not disable distinct in order optimization
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC
Sorting (Stream): a ASC, b ASC

View File

@ -10,11 +10,16 @@ DISABLE_OPTIMIZATION="set optimize_distinct_in_order=0"
ENABLE_OPTIMIZATION="set optimize_distinct_in_order=1"
GREP_DISTINCT="grep 'DistinctSortedChunkTransform\|DistinctSortedStreamTransform\|DistinctSortedTransform\|DistinctTransform'"
TRIM_LEADING_SPACES="sed -e 's/^[ \t]*//'"
FIND_DISTINCT="$GREP_DISTINCT | $TRIM_LEADING_SPACES"
REMOVE_NON_LETTERS="sed 's/[^a-zA-Z]//g'"
FIND_DISTINCT="$GREP_DISTINCT | $TRIM_LEADING_SPACES | $REMOVE_NON_LETTERS"
FIND_READING_IN_ORDER="grep 'MergeTreeInOrder' | $TRIM_LEADING_SPACES | $REMOVE_NON_LETTERS"
FIND_READING_DEFAULT="grep 'MergeTreeThread' | $TRIM_LEADING_SPACES | $REMOVE_NON_LETTERS"
FIND_SORTING_PROPERTIES="grep 'Sorting (Stream)' | $TRIM_LEADING_SPACES"
$CLICKHOUSE_CLIENT -q "drop table if exists distinct_in_order_explain sync"
$CLICKHOUSE_CLIENT -q "create table distinct_in_order_explain (a int, b int, c int) engine=MergeTree() order by (a, b)"
$CLICKHOUSE_CLIENT -q "insert into distinct_in_order_explain select number % number, number % 5, number % 10 from numbers(1,10)"
$CLICKHOUSE_CLIENT -q "insert into distinct_in_order_explain select number % number, number % 5, number % 10 from numbers(1,10)"
$CLICKHOUSE_CLIENT -q "select '-- disable optimize_distinct_in_order'"
$CLICKHOUSE_CLIENT -q "select '-- distinct all primary key columns -> ordinary distinct'"
@ -33,6 +38,9 @@ $CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a,
$CLICKHOUSE_CLIENT -q "select '-- distinct with primary key prefix and order by the same columns -> pre-distinct and final distinct optimization'"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain order by a, b" | eval $FIND_DISTINCT
$CLICKHOUSE_CLIENT -q "select '-- distinct with primary key prefix and order by columns are prefix of distinct columns -> pre-distinct and final distinct optimization'"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain order by a" | eval $FIND_DISTINCT
$CLICKHOUSE_CLIENT -q "select '-- distinct with primary key prefix and order by column in distinct but non-primary key prefix -> pre-distinct and final distinct optimization'"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b, c from distinct_in_order_explain order by c" | eval $FIND_DISTINCT
@ -51,4 +59,40 @@ $CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct b,
$CLICKHOUSE_CLIENT -q "select '-- distinct with non-primary key prefix and order by _const_ column in distinct -> ordinary distinct'"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct b, 1 as x from distinct_in_order_explain order by x" | eval $FIND_DISTINCT
echo "-- Check reading in order for distinct"
echo "-- disabled, distinct columns match sorting key"
$CLICKHOUSE_CLIENT --max_threads=0 -nq "$DISABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain" | eval $FIND_READING_DEFAULT
echo "-- enabled, distinct columns match sorting key"
# read_in_order_two_level_merge_threshold is set here to avoid repeating MergeTreeInOrder in output
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
echo "-- enabled, distinct columns form prefix of sorting key"
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
echo "-- enabled, distinct columns DON't form prefix of sorting key"
$CLICKHOUSE_CLIENT --max_threads=0 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct b from distinct_in_order_explain" | eval $FIND_READING_DEFAULT
echo "-- enabled, distinct columns contains constant columns, non-const columns form prefix of sorting key"
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct 1, a from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
echo "-- enabled, distinct columns contains constant columns, non-const columns match prefix of sorting key"
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct 1, b, a from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
echo "-- enabled, only part of distinct columns form prefix of sorting key"
$CLICKHOUSE_CLIENT --max_threads=0 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, c from distinct_in_order_explain" | eval $FIND_READING_DEFAULT
echo "-- enabled, check that sorting properties are propagated from ReadFromMergeTree till preliminary distinct"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain plan sorting=1 select distinct b, a from distinct_in_order_explain where a > 0" | eval $FIND_SORTING_PROPERTIES
echo "-- check that reading in order optimization for ORDER BY and DISTINCT applied correctly in the same query"
ENABLE_READ_IN_ORDER="set optimize_read_in_order=1"
echo "-- disabled, check that sorting description for ReadFromMergeTree match ORDER BY columns"
$CLICKHOUSE_CLIENT -nq "$DISABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a" | eval $FIND_SORTING_PROPERTIES
echo "-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization i.e. it contains columns from DISTINCT clause"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a" | eval $FIND_SORTING_PROPERTIES
echo "-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization, but direction used from ORDER BY clause"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a DESC" | eval $FIND_SORTING_PROPERTIES
echo "-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (1), - it contains columns from ORDER BY clause"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct a from distinct_in_order_explain order by a, b" | eval $FIND_SORTING_PROPERTIES
echo "-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (2), - direction used from ORDER BY clause"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a DESC, b DESC" | eval $FIND_SORTING_PROPERTIES
echo "-- enabled, check that disabling other 'read in order' optimizations do not disable distinct in order optimization"
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;set optimize_read_in_order=0;set optimize_aggregation_in_order=0;set optimize_read_in_window_order=0;explain plan sorting=1 select distinct a,b from distinct_in_order_explain" | eval $FIND_SORTING_PROPERTIES
$CLICKHOUSE_CLIENT -q "drop table if exists distinct_in_order_explain sync"

View File

@ -1 +1,2 @@
22548578304 1048576 104857600 1 0 0 0 s3_disk_cache/ 0
2147483648 1048576 104857600 1 0 0 0 s3_cache/ 0
2147483648 1048576 104857600 0 0 0 0 s3_cache_2/ 0

View File

@ -1,3 +1,4 @@
-- Tags: no-fasttest
DESCRIBE FILESYSTEM CACHE 's3_cache';
DESCRIBE FILESYSTEM CACHE 's3_cache_2';

View File

@ -1,3 +1,5 @@
-- Tags: no-backward-compatibility-check
drop table if exists test_02381;
create table test_02381(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b);
insert into test_02381 select number, number * 10 from system.numbers limit 1000000;

View File

@ -0,0 +1,174 @@
JSONCompact
{
"meta":
[
{
"name": "s",
"type": "String"
}
],
"data":
[
["� �"]
],
"rows": 1
}
JSON
{
"meta":
[
{
"name": "s",
"type": "String"
}
],
"data":
[
{
"s": "� �"
}
],
"rows": 1
}
XML
<?xml version='1.0' encoding='UTF-8' ?>
<result>
<meta>
<columns>
<column>
<name>s</name>
<type>String</type>
</column>
</columns>
</meta>
<data>
<row>
<s>� �</s>
</row>
</data>
<rows>1</rows>
</result>
JSONColumnsWithMetadata
{
"meta":
[
{
"name": "s",
"type": "String"
}
],
"data":
{
"s": ["� �"]
},
"rows": 1
}
JSONEachRow
{"s":"� �"}
JSONCompactEachRow
["� �"]
JSONColumns
{
"s": ["� �"]
}
JSONCompactColumns
[
["� �"]
]
JSONObjectEachRow
{
"row_1": {"s":"� �"}
}
JSONCompact
{
"meta":
[
{
"name": "s",
"type": "String"
}
],
"data":
[
["� �"]
],
"rows": 1
}
JSON
{
"meta":
[
{
"name": "s",
"type": "String"
}
],
"data":
[
{
"s": "� �"
}
],
"rows": 1
}
XML
<?xml version='1.0' encoding='UTF-8' ?>
<result>
<meta>
<columns>
<column>
<name>s</name>
<type>String</type>
</column>
</columns>
</meta>
<data>
<row>
<s>� �</s>
</row>
</data>
<rows>1</rows>
</result>
JSONColumnsWithMetadata
{
"meta":
[
{
"name": "s",
"type": "String"
}
],
"data":
{
"s": ["� �"]
},
"rows": 1
}
JSONEachRow
{"s":"í ¨"}
JSONCompactEachRow
["í ¨"]
JSONColumns
{
"s": ["í ¨"]
}
JSONCompactColumns
[
["í ¨"]
]
JSONObjectEachRow
{
"row_1": {"s":"í ¨"}
}

View File

@ -0,0 +1,42 @@
SET output_format_write_statistics = 0;
SET output_format_json_validate_utf8 = 1;
SELECT 'JSONCompact';
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompact;
SELECT 'JSON';
SELECT '\xED\x20\xA8' AS s FORMAT JSON;
SELECT 'XML';
SELECT '\xED\x20\xA8' AS s FORMAT XML;
SELECT 'JSONColumnsWithMetadata';
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumnsWithMetadata;
SELECT 'JSONEachRow';
SELECT '\xED\x20\xA8' AS s FORMAT JSONEachRow;
SELECT 'JSONCompactEachRow';
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactEachRow;
SELECT 'JSONColumns';
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumns;
SELECT 'JSONCompactColumns';
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactColumns;
SELECT 'JSONObjectEachRow';
SELECT '\xED\x20\xA8' AS s FORMAT JSONObjectEachRow;
SET output_format_json_validate_utf8 = 0;
SELECT 'JSONCompact';
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompact;
SELECT 'JSON';
SELECT '\xED\x20\xA8' AS s FORMAT JSON;
SELECT 'XML';
SELECT '\xED\x20\xA8' AS s FORMAT XML;
SELECT 'JSONColumnsWithMetadata';
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumnsWithMetadata;
SELECT 'JSONEachRow';
SELECT '\xED\x20\xA8' AS s FORMAT JSONEachRow;
SELECT 'JSONCompactEachRow';
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactEachRow;
SELECT 'JSONColumns';
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumns;
SELECT 'JSONCompactColumns';
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactColumns;
SELECT 'JSONObjectEachRow';
SELECT '\xED\x20\xA8' AS s FORMAT JSONObjectEachRow;

View File

@ -1,3 +1,5 @@
v22.9.2.7-stable 2022-09-23
v22.9.1.2603-stable 2022-09-22
v22.8.5.29-lts 2022-09-13
v22.8.4.7-lts 2022-08-31
v22.8.3.13-lts 2022-08-29

1 v22.8.5.29-lts v22.9.2.7-stable 2022-09-13 2022-09-23
1 v22.9.2.7-stable 2022-09-23
2 v22.9.1.2603-stable 2022-09-22
3 v22.8.5.29-lts v22.8.5.29-lts 2022-09-13 2022-09-13
4 v22.8.4.7-lts v22.8.4.7-lts 2022-08-31 2022-08-31
5 v22.8.3.13-lts v22.8.3.13-lts 2022-08-29 2022-08-29