mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 13:32:13 +00:00
Merge branch '40907_Parameterized_views_as_table_functions' of github.com:ClickHouse/ClickHouse into 40907_Parameterized_views_as_table_functions
This commit is contained in:
commit
baa60008d2
@ -163,7 +163,7 @@
|
||||
* Fix `base58Encode / base58Decode` handling leading 0 / '1'. [#40620](https://github.com/ClickHouse/ClickHouse/pull/40620) ([Andrey Zvonov](https://github.com/zvonand)).
|
||||
* keeper-fix: fix race in accessing logs while snapshot is being installed. [#40627](https://github.com/ClickHouse/ClickHouse/pull/40627) ([Antonio Andelic](https://github.com/antonio2368)).
|
||||
* Fix short circuit execution of toFixedString function. Solves (partially) [#40622](https://github.com/ClickHouse/ClickHouse/issues/40622). [#40628](https://github.com/ClickHouse/ClickHouse/pull/40628) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* - Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)).
|
||||
* Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)).
|
||||
* Fix stack overflow in recursive `Buffer` tables. This closes [#40637](https://github.com/ClickHouse/ClickHouse/issues/40637). [#40643](https://github.com/ClickHouse/ClickHouse/pull/40643) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* During insertion of a new query to the `ProcessList` allocations happen. If we reach the memory limit during these allocations we can not use `OvercommitTracker`, because `ProcessList::mutex` is already acquired. Fixes [#40611](https://github.com/ClickHouse/ClickHouse/issues/40611). [#40677](https://github.com/ClickHouse/ClickHouse/pull/40677) ([Dmitry Novik](https://github.com/novikd)).
|
||||
* Fix LOGICAL_ERROR with max_read_buffer_size=0 during reading marks. [#40705](https://github.com/ClickHouse/ClickHouse/pull/40705) ([Azat Khuzhin](https://github.com/azat)).
|
||||
@ -174,7 +174,7 @@
|
||||
* In [#40595](https://github.com/ClickHouse/ClickHouse/issues/40595) it was reported that the `host_regexp` functionality was not working properly with a name to address resolution in `/etc/hosts`. It's fixed. [#40769](https://github.com/ClickHouse/ClickHouse/pull/40769) ([Arthur Passos](https://github.com/arthurpassos)).
|
||||
* Fix incremental backups for Log family. [#40827](https://github.com/ClickHouse/ClickHouse/pull/40827) ([Vitaly Baranov](https://github.com/vitlibar)).
|
||||
* Fix extremely rare bug which can lead to potential data loss in zero-copy replication. [#40844](https://github.com/ClickHouse/ClickHouse/pull/40844) ([alesapin](https://github.com/alesapin)).
|
||||
* - Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)).
|
||||
* Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)).
|
||||
* Fix nested JSON Objects schema inference. [#40851](https://github.com/ClickHouse/ClickHouse/pull/40851) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Fix 3-digit prefix directory for filesystem cache files not being deleted if empty. Closes [#40797](https://github.com/ClickHouse/ClickHouse/issues/40797). [#40867](https://github.com/ClickHouse/ClickHouse/pull/40867) ([Kseniia Sumarokova](https://github.com/kssenii)).
|
||||
* Fix uncaught DNS_ERROR on failed connection to replicas. [#40881](https://github.com/ClickHouse/ClickHouse/pull/40881) ([Robert Coelho](https://github.com/coelho)).
|
||||
|
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit 9fec8e11dbb6a352e1cfba8cc9e23ebd7fb77310
|
||||
Subproject commit 76746b35d0e254eaaba71dc3b79e46cba8cbb144
|
@ -141,7 +141,7 @@ static void testIndex()
|
||||
size_t index_rows = rng() % MAX_ROWS + 1;
|
||||
|
||||
test_case(rows, index_rows, 0);
|
||||
test_case(rows, index_rows, 0.5 * index_rows);
|
||||
test_case(rows, index_rows, static_cast<size_t>(0.5 * index_rows));
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
|
48
src/Common/EventNotifier.cpp
Normal file
48
src/Common/EventNotifier.cpp
Normal file
@ -0,0 +1,48 @@
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
std::unique_ptr<EventNotifier> EventNotifier::event_notifier;
|
||||
|
||||
EventNotifier & EventNotifier::init()
|
||||
{
|
||||
if (event_notifier)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is initialized twice. This is a bug.");
|
||||
|
||||
event_notifier = std::make_unique<EventNotifier>();
|
||||
|
||||
return *event_notifier;
|
||||
}
|
||||
|
||||
EventNotifier & EventNotifier::instance()
|
||||
{
|
||||
if (!event_notifier)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is not initialized. This is a bug.");
|
||||
|
||||
return *event_notifier;
|
||||
}
|
||||
|
||||
void EventNotifier::shutdown()
|
||||
{
|
||||
if (event_notifier)
|
||||
event_notifier.reset();
|
||||
}
|
||||
|
||||
size_t EventNotifier::calculateIdentifier(size_t a, size_t b)
|
||||
{
|
||||
size_t result = 0;
|
||||
boost::hash_combine(result, a);
|
||||
boost::hash_combine(result, b);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
92
src/Common/EventNotifier.h
Normal file
92
src/Common/EventNotifier.h
Normal file
@ -0,0 +1,92 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
#include <functional>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <iostream>
|
||||
|
||||
#include <base/types.h>
|
||||
#include <Common/HashTable/Hash.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class EventNotifier
|
||||
{
|
||||
public:
|
||||
struct Handler
|
||||
{
|
||||
Handler(
|
||||
EventNotifier & parent_,
|
||||
size_t event_id_,
|
||||
size_t callback_id_)
|
||||
: parent(parent_)
|
||||
, event_id(event_id_)
|
||||
, callback_id(callback_id_)
|
||||
{}
|
||||
|
||||
~Handler()
|
||||
{
|
||||
std::lock_guard lock(parent.mutex);
|
||||
|
||||
parent.callback_table[event_id].erase(callback_id);
|
||||
parent.storage.erase(callback_id);
|
||||
}
|
||||
|
||||
private:
|
||||
EventNotifier & parent;
|
||||
size_t event_id;
|
||||
size_t callback_id;
|
||||
};
|
||||
|
||||
using HandlerPtr = std::shared_ptr<Handler>;
|
||||
|
||||
static EventNotifier & init();
|
||||
static EventNotifier & instance();
|
||||
static void shutdown();
|
||||
|
||||
template <typename EventType, typename Callback>
|
||||
[[ nodiscard ]] HandlerPtr subscribe(EventType event, Callback && callback)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto event_id = DefaultHash64(event);
|
||||
auto callback_id = calculateIdentifier(event_id, ++counter);
|
||||
|
||||
callback_table[event_id].insert(callback_id);
|
||||
storage[callback_id] = std::forward<Callback>(callback);
|
||||
|
||||
return std::make_shared<Handler>(*this, event_id, callback_id);
|
||||
}
|
||||
|
||||
template <typename EventType>
|
||||
void notify(EventType event)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
for (const auto & identifier : callback_table[DefaultHash64(event)])
|
||||
storage[identifier]();
|
||||
}
|
||||
|
||||
private:
|
||||
// To move boost include for .h file
|
||||
static size_t calculateIdentifier(size_t a, size_t b);
|
||||
|
||||
using CallbackType = std::function<void()>;
|
||||
using CallbackStorage = std::map<size_t, CallbackType>;
|
||||
using EventToCallbacks = std::map<size_t, std::set<size_t>>;
|
||||
|
||||
std::mutex mutex;
|
||||
|
||||
EventToCallbacks callback_table;
|
||||
CallbackStorage storage;
|
||||
size_t counter{0};
|
||||
|
||||
static std::unique_ptr<EventNotifier> event_notifier;
|
||||
};
|
||||
|
||||
}
|
@ -139,12 +139,14 @@ void ZooKeeper::init(ZooKeeperArgs args_)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
: zk_log(std::move(zk_log_))
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
init(args_);
|
||||
}
|
||||
|
||||
|
||||
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
: zk_log(std::move(zk_log_))
|
||||
{
|
||||
|
@ -1,15 +1,16 @@
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperImpl.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <base/getThreadId.h>
|
||||
|
||||
#include <Common/config.h>
|
||||
@ -874,7 +875,11 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
|
||||
/// No new requests will appear in queue after finish()
|
||||
bool was_already_finished = requests_queue.finish();
|
||||
if (!was_already_finished)
|
||||
{
|
||||
active_session_metric_increment.destroy();
|
||||
/// Notify all subscribers (ReplicatedMergeTree tables) about expired session
|
||||
EventNotifier::instance().notify(Error::ZSESSIONEXPIRED);
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
|
53
src/Common/tests/gtest_event_notifier.cpp
Normal file
53
src/Common/tests/gtest_event_notifier.cpp
Normal file
@ -0,0 +1,53 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
|
||||
|
||||
TEST(EventNotifier, SimpleTest)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
size_t result = 1;
|
||||
EventNotifier::init();
|
||||
|
||||
auto handler3 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 3; });
|
||||
|
||||
{
|
||||
auto handler5 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 5; });
|
||||
}
|
||||
|
||||
auto handler7 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 7; });
|
||||
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 21);
|
||||
|
||||
result = 1;
|
||||
handler3.reset();
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 7);
|
||||
|
||||
auto handler11 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 11; });
|
||||
|
||||
result = 1;
|
||||
handler7.reset();
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 11);
|
||||
|
||||
EventNotifier::HandlerPtr handler13;
|
||||
{
|
||||
handler13 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 13; });
|
||||
}
|
||||
|
||||
result = 1;
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 143);
|
||||
|
||||
result = 1;
|
||||
handler11.reset();
|
||||
handler13.reset();
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 1);
|
||||
|
||||
EventNotifier::shutdown();
|
||||
}
|
@ -245,8 +245,8 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
|
||||
credentials.setPassword(named_collection->configuration.password);
|
||||
|
||||
header_entries.reserve(named_collection->configuration.headers.size());
|
||||
for (const auto & header : named_collection->configuration.headers)
|
||||
header_entries.emplace_back(std::make_tuple(header.first, header.second.get<String>()));
|
||||
for (const auto & [key, value] : named_collection->configuration.headers)
|
||||
header_entries.emplace_back(std::make_tuple(key, value));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
||||
@ -53,7 +54,31 @@ const std::string & MetadataStorageFromStaticFilesWebServer::getPath() const
|
||||
|
||||
bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const
|
||||
{
|
||||
return object_storage.files.contains(path);
|
||||
fs::path fs_path(path);
|
||||
if (fs_path.has_extension())
|
||||
fs_path = fs_path.parent_path();
|
||||
|
||||
initializeIfNeeded(fs_path, false);
|
||||
|
||||
if (object_storage.files.empty())
|
||||
return false;
|
||||
|
||||
if (object_storage.files.contains(path))
|
||||
return true;
|
||||
|
||||
/// `object_storage.files` contains files + directories only inside `metadata_path / uuid_3_digit / uuid /`
|
||||
/// (specific table files only), but we need to be able to also tell if `exists(<metadata_path>)`, for example.
|
||||
auto it = std::lower_bound(
|
||||
object_storage.files.begin(),
|
||||
object_storage.files.end(),
|
||||
path,
|
||||
[](const auto & file, const std::string & path_) { return file.first < path_; }
|
||||
);
|
||||
if (startsWith(it->first, path)
|
||||
|| (it != object_storage.files.begin() && startsWith(std::prev(it)->first, path)))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const
|
||||
@ -98,7 +123,10 @@ uint64_t MetadataStorageFromStaticFilesWebServer::getFileSize(const String & pat
|
||||
StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const
|
||||
{
|
||||
assertExists(path);
|
||||
return {StoredObject::create(object_storage, path, object_storage.files.at(path).size, true)};
|
||||
auto fs_path = fs::path(object_storage.url) / path;
|
||||
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||
remote_path = remote_path.substr(object_storage.url.size());
|
||||
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, true)};
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const
|
||||
@ -112,7 +140,7 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
|
||||
return result;
|
||||
}
|
||||
|
||||
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path) const
|
||||
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error) const
|
||||
{
|
||||
if (object_storage.files.find(path) == object_storage.files.end())
|
||||
{
|
||||
@ -123,7 +151,7 @@ bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::stri
|
||||
catch (...)
|
||||
{
|
||||
const auto message = getCurrentExceptionMessage(false);
|
||||
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
|
||||
bool can_throw = throw_on_error.has_value() ? *throw_on_error : CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
|
||||
if (can_throw)
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
|
||||
|
||||
@ -140,13 +168,17 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
|
||||
std::vector<fs::path> dir_file_paths;
|
||||
|
||||
if (!initializeIfNeeded(path))
|
||||
{
|
||||
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
|
||||
}
|
||||
|
||||
assertExists(path);
|
||||
|
||||
for (const auto & [file_path, _] : object_storage.files)
|
||||
if (parentPath(file_path) == path)
|
||||
{
|
||||
if (fs::path(parentPath(file_path)) / "" == fs::path(path) / "")
|
||||
dir_file_paths.emplace_back(file_path);
|
||||
}
|
||||
|
||||
LOG_TRACE(object_storage.log, "Iterate directory {} with {} files", path, dir_file_paths.size());
|
||||
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
|
||||
|
@ -19,7 +19,7 @@ private:
|
||||
|
||||
void assertExists(const std::string & path) const;
|
||||
|
||||
bool initializeIfNeeded(const std::string & path) const;
|
||||
bool initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error = std::nullopt) const;
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);
|
||||
|
@ -30,7 +30,6 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int NETWORK_ERROR;
|
||||
}
|
||||
|
||||
@ -153,20 +152,6 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
||||
std::optional<size_t>,
|
||||
std::optional<size_t>) const
|
||||
{
|
||||
const auto & path = object.absolute_path;
|
||||
LOG_TRACE(log, "Read from path: {}", path);
|
||||
|
||||
auto iter = files.find(path);
|
||||
if (iter == files.end())
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File path {} does not exist", path);
|
||||
|
||||
auto fs_path = fs::path(url) / path;
|
||||
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||
remote_path = remote_path.string().substr(url.size());
|
||||
|
||||
StoredObjects objects;
|
||||
objects.emplace_back(remote_path, iter->second.size);
|
||||
|
||||
auto read_buffer_creator =
|
||||
[this, read_settings]
|
||||
(const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
|
||||
@ -179,7 +164,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
||||
read_until_position);
|
||||
};
|
||||
|
||||
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, read_settings);
|
||||
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
|
||||
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
|
@ -114,7 +114,7 @@ protected:
|
||||
size_t size = 0;
|
||||
};
|
||||
|
||||
using Files = std::unordered_map<String, FileData>; /// file path -> file data
|
||||
using Files = std::map<String, FileData>; /// file path -> file data
|
||||
mutable Files files;
|
||||
|
||||
String url;
|
||||
|
@ -274,6 +274,8 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ true);
|
||||
}
|
||||
|
||||
/// In case of error this address will be written to logs
|
||||
request.SetResolvedRemoteHost(session->getResolvedAddress());
|
||||
|
||||
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);
|
||||
|
||||
|
@ -114,6 +114,7 @@ struct URI
|
||||
bool is_virtual_hosted_style;
|
||||
|
||||
explicit URI(const Poco::URI & uri_);
|
||||
explicit URI(const std::string & uri_) : URI(Poco::URI(uri_)) {}
|
||||
|
||||
static void validateBucket(const String & bucket, const Poco::URI & uri);
|
||||
};
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/formatReadable.h>
|
||||
@ -513,6 +514,7 @@ void Context::initGlobal()
|
||||
assert(!global_context_instance);
|
||||
global_context_instance = shared_from_this();
|
||||
DatabaseCatalog::init(shared_from_this());
|
||||
EventNotifier::init();
|
||||
}
|
||||
|
||||
SharedContextHolder Context::createShared()
|
||||
|
@ -307,7 +307,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
|
||||
{
|
||||
const auto header_prefix = headers_prefix + header;
|
||||
configuration.headers.emplace_back(
|
||||
std::make_pair(headers_config->getString(header_prefix + ".name"), headers_config->getString(header_prefix + ".value")));
|
||||
headers_config->getString(header_prefix + ".name"),
|
||||
headers_config->getString(header_prefix + ".value"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -446,7 +447,9 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
|
||||
for (const auto & header : header_keys)
|
||||
{
|
||||
const auto header_prefix = config_prefix + ".headers." + header;
|
||||
configuration.headers.emplace_back(std::make_pair(config.getString(header_prefix + ".name"), config.getString(header_prefix + ".value")));
|
||||
configuration.headers.emplace_back(
|
||||
config.getString(header_prefix + ".name"),
|
||||
config.getString(header_prefix + ".value"));
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Storages/HeaderCollection.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -108,7 +109,7 @@ struct URLBasedDataSourceConfiguration
|
||||
String user;
|
||||
String password;
|
||||
|
||||
std::vector<std::pair<String, Field>> headers;
|
||||
HeaderCollection headers;
|
||||
String http_method;
|
||||
|
||||
void set(const URLBasedDataSourceConfiguration & conf);
|
||||
|
18
src/Storages/HeaderCollection.h
Normal file
18
src/Storages/HeaderCollection.h
Normal file
@ -0,0 +1,18 @@
|
||||
#pragma once
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct HttpHeader
|
||||
{
|
||||
std::string name;
|
||||
std::string value;
|
||||
|
||||
HttpHeader(const std::string & name_, const std::string & value_) : name(name_), value(value_) {}
|
||||
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
|
||||
};
|
||||
|
||||
using HeaderCollection = std::vector<HttpHeader>;
|
||||
|
||||
}
|
@ -1467,6 +1467,8 @@ bool MutateTask::execute()
|
||||
}
|
||||
case State::NEED_EXECUTE:
|
||||
{
|
||||
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
|
||||
|
||||
if (task->executeStep())
|
||||
return true;
|
||||
|
||||
|
@ -4185,6 +4185,11 @@ void StorageReplicatedMergeTree::startupImpl()
|
||||
|
||||
/// In this thread replica will be activated.
|
||||
restarting_thread.start();
|
||||
/// And this is just a callback
|
||||
session_expired_callback_handler = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [this]()
|
||||
{
|
||||
restarting_thread.start();
|
||||
});
|
||||
|
||||
/// Wait while restarting_thread finishing initialization.
|
||||
/// NOTE It does not mean that replication is actually started after receiving this event.
|
||||
@ -4228,6 +4233,8 @@ void StorageReplicatedMergeTree::shutdown()
|
||||
if (shutdown_called.exchange(true))
|
||||
return;
|
||||
|
||||
session_expired_callback_handler.reset();
|
||||
|
||||
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
|
||||
fetcher.blocker.cancelForever();
|
||||
merger_mutator.merges_blocker.cancelForever();
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <base/defines.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
@ -453,6 +454,7 @@ private:
|
||||
|
||||
/// A thread that processes reconnection to ZooKeeper when the session expires.
|
||||
ReplicatedMergeTreeRestartingThread restarting_thread;
|
||||
EventNotifier::HandlerPtr session_expired_callback_handler;
|
||||
|
||||
/// A thread that attaches the table using ZooKeeper
|
||||
std::optional<ReplicatedMergeTreeAttachThread> attach_thread;
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Storages/getVirtualsForStorage.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
@ -767,33 +768,28 @@ private:
|
||||
|
||||
|
||||
StorageS3::StorageS3(
|
||||
const S3::URI & uri_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
const S3Settings::ReadWriteSettings & rw_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const String & compression_method_,
|
||||
bool distributed_processing_,
|
||||
ASTPtr partition_by_)
|
||||
: IStorage(table_id_)
|
||||
, s3_configuration{uri_, access_key_id_, secret_access_key_, {}, {}, rw_settings_} /// Client and settings will be updated later
|
||||
, keys({uri_.key})
|
||||
, format_name(format_name_)
|
||||
, compression_method(compression_method_)
|
||||
, name(uri_.storage_name)
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, keys({s3_configuration.uri.key})
|
||||
, format_name(configuration_.format)
|
||||
, compression_method(configuration_.compression_method)
|
||||
, name(s3_configuration.uri.storage_name)
|
||||
, distributed_processing(distributed_processing_)
|
||||
, format_settings(format_settings_)
|
||||
, partition_by(partition_by_)
|
||||
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
|
||||
, is_key_with_globs(s3_configuration.uri.key.find_first_of("*?{") != std::string::npos)
|
||||
{
|
||||
FormatFactory::instance().checkFormatName(format_name);
|
||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
|
||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.uri.uri);
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
|
||||
updateS3Configuration(context_, s3_configuration);
|
||||
@ -1062,47 +1058,48 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
|
||||
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
|
||||
{
|
||||
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
|
||||
const auto & config_rw_settings = settings.rw_settings;
|
||||
|
||||
bool need_update_configuration = settings != S3Settings{};
|
||||
if (need_update_configuration)
|
||||
{
|
||||
if (upd.rw_settings != settings.rw_settings)
|
||||
upd.rw_settings = settings.rw_settings;
|
||||
}
|
||||
if (upd.rw_settings != config_rw_settings)
|
||||
upd.rw_settings = settings.rw_settings;
|
||||
|
||||
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
|
||||
|
||||
if (upd.client && (!upd.access_key_id.empty() || settings.auth_settings == upd.auth_settings))
|
||||
return;
|
||||
|
||||
Aws::Auth::AWSCredentials credentials(upd.access_key_id, upd.secret_access_key);
|
||||
HeaderCollection headers;
|
||||
if (upd.access_key_id.empty())
|
||||
if (upd.client)
|
||||
{
|
||||
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key);
|
||||
headers = settings.auth_settings.headers;
|
||||
if (upd.static_configuration)
|
||||
return;
|
||||
|
||||
if (settings.auth_settings == upd.auth_settings)
|
||||
return;
|
||||
}
|
||||
|
||||
upd.auth_settings.updateFrom(settings.auth_settings);
|
||||
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
settings.auth_settings.region,
|
||||
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
|
||||
upd.auth_settings.region,
|
||||
ctx->getRemoteHostFilter(),
|
||||
ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
|
||||
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false);
|
||||
|
||||
client_configuration.endpointOverride = upd.uri.endpoint;
|
||||
client_configuration.maxConnections = upd.rw_settings.max_connections;
|
||||
|
||||
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
|
||||
auto headers = upd.auth_settings.headers;
|
||||
if (!upd.headers_from_ast.empty())
|
||||
headers.insert(headers.end(), upd.headers_from_ast.begin(), upd.headers_from_ast.end());
|
||||
|
||||
upd.client = S3::ClientFactory::instance().create(
|
||||
client_configuration,
|
||||
upd.uri.is_virtual_hosted_style,
|
||||
credentials.GetAWSAccessKeyId(),
|
||||
credentials.GetAWSSecretKey(),
|
||||
settings.auth_settings.server_side_encryption_customer_key_base64,
|
||||
upd.auth_settings.server_side_encryption_customer_key_base64,
|
||||
std::move(headers),
|
||||
settings.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
|
||||
settings.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
|
||||
|
||||
upd.auth_settings = std::move(settings.auth_settings);
|
||||
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
|
||||
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
|
||||
}
|
||||
|
||||
|
||||
@ -1155,6 +1152,10 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
|
||||
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
auto header_it = StorageURL::collectHeaders(engine_args, configuration, local_context);
|
||||
if (header_it != engine_args.end())
|
||||
engine_args.erase(header_it);
|
||||
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
|
||||
|
||||
@ -1184,19 +1185,23 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
|
||||
}
|
||||
|
||||
ColumnsDescription StorageS3::getTableStructureFromData(
|
||||
const String & format,
|
||||
const S3::URI & uri,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & compression_method,
|
||||
const StorageS3Configuration & configuration,
|
||||
bool distributed_processing,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos)
|
||||
{
|
||||
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) };
|
||||
S3Configuration s3_configuration{
|
||||
configuration.url,
|
||||
configuration.auth_settings,
|
||||
S3Settings::ReadWriteSettings(ctx->getSettingsRef()),
|
||||
configuration.headers};
|
||||
|
||||
updateS3Configuration(ctx, s3_configuration);
|
||||
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
|
||||
|
||||
return getTableStructureFromDataImpl(
|
||||
configuration.format, s3_configuration, configuration.compression_method, distributed_processing,
|
||||
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
@ -1308,25 +1313,18 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
|
||||
format_settings = getFormatSettings(args.getContext());
|
||||
}
|
||||
|
||||
S3::URI s3_uri(Poco::URI(configuration.url));
|
||||
|
||||
ASTPtr partition_by;
|
||||
if (args.storage_def->partition_by)
|
||||
partition_by = args.storage_def->partition_by->clone();
|
||||
|
||||
return std::make_shared<StorageS3>(
|
||||
s3_uri,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
args.table_id,
|
||||
configuration.format,
|
||||
configuration.rw_settings,
|
||||
args.columns,
|
||||
args.constraints,
|
||||
args.comment,
|
||||
args.getContext(),
|
||||
format_settings,
|
||||
configuration.compression_method,
|
||||
/* distributed_processing_ */false,
|
||||
partition_by);
|
||||
},
|
||||
|
@ -149,18 +149,13 @@ class StorageS3 : public IStorage, WithContext
|
||||
{
|
||||
public:
|
||||
StorageS3(
|
||||
const S3::URI & uri,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
const S3Settings::ReadWriteSettings & rw_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const String & compression_method_ = "",
|
||||
bool distributed_processing_ = false,
|
||||
ASTPtr partition_by_ = nullptr);
|
||||
|
||||
@ -189,11 +184,7 @@ public:
|
||||
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const String & format,
|
||||
const S3::URI & uri,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & compression_method,
|
||||
const StorageS3Configuration & configuration,
|
||||
bool distributed_processing,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx,
|
||||
@ -204,11 +195,28 @@ public:
|
||||
struct S3Configuration
|
||||
{
|
||||
const S3::URI uri;
|
||||
const String access_key_id;
|
||||
const String secret_access_key;
|
||||
std::shared_ptr<const Aws::S3::S3Client> client;
|
||||
|
||||
S3Settings::AuthSettings auth_settings;
|
||||
S3Settings::ReadWriteSettings rw_settings;
|
||||
|
||||
/// If s3 configuration was passed from ast, then it is static.
|
||||
/// If from config - it can be changed with config reload.
|
||||
bool static_configuration = true;
|
||||
|
||||
/// Headers from ast is a part of static configuration.
|
||||
HeaderCollection headers_from_ast;
|
||||
|
||||
S3Configuration(
|
||||
const String & url_,
|
||||
const S3Settings::AuthSettings & auth_settings_,
|
||||
const S3Settings::ReadWriteSettings & rw_settings_,
|
||||
const HeaderCollection & headers_from_ast_)
|
||||
: uri(S3::URI(url_))
|
||||
, auth_settings(auth_settings_)
|
||||
, rw_settings(rw_settings_)
|
||||
, static_configuration(!auth_settings_.access_key_id.empty())
|
||||
, headers_from_ast(headers_from_ast_) {}
|
||||
};
|
||||
|
||||
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
|
||||
|
@ -46,22 +46,17 @@
|
||||
namespace DB
|
||||
{
|
||||
StorageS3Cluster::StorageS3Cluster(
|
||||
const String & filename_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
const StorageS3ClusterConfiguration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
String cluster_name_,
|
||||
const String & format_name_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
ContextPtr context_,
|
||||
const String & compression_method_)
|
||||
ContextPtr context_)
|
||||
: IStorage(table_id_)
|
||||
, s3_configuration{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, {}, {}, S3Settings::ReadWriteSettings(context_->getSettingsRef())}
|
||||
, filename(filename_)
|
||||
, cluster_name(cluster_name_)
|
||||
, format_name(format_name_)
|
||||
, compression_method(compression_method_)
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, filename(configuration_.url)
|
||||
, cluster_name(configuration_.cluster_name)
|
||||
, format_name(configuration_.format)
|
||||
, compression_method(configuration_.compression_method)
|
||||
{
|
||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
|
@ -21,16 +21,11 @@ class StorageS3Cluster : public IStorage
|
||||
{
|
||||
public:
|
||||
StorageS3Cluster(
|
||||
const String & filename_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
const StorageS3ClusterConfiguration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
String cluster_name_,
|
||||
const String & format_name_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
ContextPtr context_,
|
||||
const String & compression_method_);
|
||||
ContextPtr context_);
|
||||
|
||||
std::string getName() const override { return "S3Cluster"; }
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <vector>
|
||||
#include <base/types.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Storages/HeaderCollection.h>
|
||||
|
||||
namespace Poco::Util
|
||||
{
|
||||
@ -15,15 +16,6 @@ class AbstractConfiguration;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct HttpHeader
|
||||
{
|
||||
String name;
|
||||
String value;
|
||||
|
||||
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
|
||||
};
|
||||
|
||||
using HeaderCollection = std::vector<HttpHeader>;
|
||||
|
||||
struct Settings;
|
||||
|
||||
@ -50,6 +42,21 @@ struct S3Settings
|
||||
&& use_environment_credentials == other.use_environment_credentials
|
||||
&& use_insecure_imds_request == other.use_insecure_imds_request;
|
||||
}
|
||||
|
||||
void updateFrom(const AuthSettings & from)
|
||||
{
|
||||
/// Update with check for emptyness only parameters which
|
||||
/// can be passed not only from config, but via ast.
|
||||
|
||||
if (!from.access_key_id.empty())
|
||||
access_key_id = from.access_key_id;
|
||||
if (!from.secret_access_key.empty())
|
||||
secret_access_key = from.secret_access_key;
|
||||
|
||||
headers = from.headers;
|
||||
region = from.region;
|
||||
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
|
||||
}
|
||||
};
|
||||
|
||||
struct ReadWriteSettings
|
||||
@ -94,7 +101,6 @@ struct S3Settings
|
||||
class StorageS3Settings
|
||||
{
|
||||
public:
|
||||
StorageS3Settings() = default;
|
||||
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings);
|
||||
|
||||
S3Settings getSettings(const String & endpoint) const;
|
||||
|
@ -1018,7 +1018,7 @@ ASTs::iterator StorageURL::collectHeaders(
|
||||
if (arg_value.getType() != Field::Types::Which::String)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
|
||||
|
||||
configuration.headers.emplace_back(arg_name, arg_value);
|
||||
configuration.headers.emplace_back(arg_name, arg_value.safeGet<String>());
|
||||
}
|
||||
|
||||
headers_it = arg_it;
|
||||
@ -1096,10 +1096,9 @@ void registerStorageURL(StorageFactory & factory)
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
||||
for (const auto & [header, value] : configuration.headers)
|
||||
{
|
||||
auto value_literal = value.safeGet<String>();
|
||||
if (header == "Range")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||
headers.emplace_back(std::make_pair(header, value_literal));
|
||||
headers.emplace_back(header, value);
|
||||
}
|
||||
|
||||
ASTPtr partition_by;
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include "registerTableFunctions.h"
|
||||
#include <filesystem>
|
||||
@ -40,6 +41,10 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
|
||||
if (args.empty() || args.size() > 6)
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
|
||||
|
||||
auto header_it = StorageURL::collectHeaders(args, s3_configuration, context);
|
||||
if (header_it != args.end())
|
||||
args.erase(header_it);
|
||||
|
||||
for (auto & arg : args)
|
||||
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
|
||||
|
||||
@ -135,15 +140,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(
|
||||
configuration.format,
|
||||
S3::URI(Poco::URI(configuration.url)),
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration.compression_method,
|
||||
false,
|
||||
std::nullopt,
|
||||
context);
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
@ -161,19 +158,14 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
|
||||
columns = structure_hint;
|
||||
|
||||
StoragePtr storage = std::make_shared<StorageS3>(
|
||||
s3_uri,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
configuration.format,
|
||||
configuration.rw_settings,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
String{},
|
||||
context,
|
||||
/// No format_settings for table function S3
|
||||
std::nullopt,
|
||||
configuration.compression_method);
|
||||
std::nullopt);
|
||||
|
||||
storage->startup();
|
||||
|
||||
|
@ -82,19 +82,10 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
|
||||
|
||||
ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(
|
||||
configuration.format,
|
||||
S3::URI(Poco::URI(configuration.url)),
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration.compression_method,
|
||||
false,
|
||||
std::nullopt,
|
||||
context);
|
||||
}
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
}
|
||||
@ -104,46 +95,38 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
|
||||
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
StoragePtr storage;
|
||||
|
||||
ColumnsDescription columns;
|
||||
|
||||
if (configuration.structure != "auto")
|
||||
{
|
||||
columns = parseColumnsListFromString(configuration.structure, context);
|
||||
}
|
||||
else if (!structure_hint.empty())
|
||||
{
|
||||
columns = structure_hint;
|
||||
}
|
||||
|
||||
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
|
||||
{
|
||||
/// On worker node this filename won't contains globs
|
||||
Poco::URI uri (configuration.url);
|
||||
S3::URI s3_uri (uri);
|
||||
storage = std::make_shared<StorageS3>(
|
||||
s3_uri,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
configuration.format,
|
||||
configuration.rw_settings,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
String{},
|
||||
/* comment */String{},
|
||||
context,
|
||||
// No format_settings for S3Cluster
|
||||
std::nullopt,
|
||||
configuration.compression_method,
|
||||
/* format_settings */std::nullopt, /// No format_settings for S3Cluster
|
||||
/*distributed_processing=*/true);
|
||||
}
|
||||
else
|
||||
{
|
||||
storage = std::make_shared<StorageS3Cluster>(
|
||||
configuration.url,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
configuration.cluster_name, configuration.format,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
context,
|
||||
configuration.compression_method);
|
||||
context);
|
||||
}
|
||||
|
||||
storage->startup();
|
||||
|
@ -103,10 +103,9 @@ ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
||||
for (const auto & [header, value] : configuration.headers)
|
||||
{
|
||||
auto value_literal = value.safeGet<String>();
|
||||
if (header == "Range")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||
headers.emplace_back(std::make_pair(header, value_literal));
|
||||
headers.emplace_back(header, value);
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
@ -25,19 +25,17 @@ def cluster():
|
||||
global uuids
|
||||
for i in range(3):
|
||||
node1.query(
|
||||
""" CREATE TABLE data{} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def';""".format(
|
||||
i
|
||||
)
|
||||
f"CREATE TABLE data{i} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def', min_bytes_for_wide_part=1;"
|
||||
)
|
||||
node1.query(
|
||||
"INSERT INTO data{} SELECT number FROM numbers(500000 * {})".format(
|
||||
i, i + 1
|
||||
|
||||
for _ in range(10):
|
||||
node1.query(
|
||||
f"INSERT INTO data{i} SELECT number FROM numbers(500000 * {i+1})"
|
||||
)
|
||||
)
|
||||
expected = node1.query("SELECT * FROM data{} ORDER BY id".format(i))
|
||||
expected = node1.query(f"SELECT * FROM data{i} ORDER BY id")
|
||||
|
||||
metadata_path = node1.query(
|
||||
"SELECT data_paths FROM system.tables WHERE name='data{}'".format(i)
|
||||
f"SELECT data_paths FROM system.tables WHERE name='data{i}'"
|
||||
)
|
||||
metadata_path = metadata_path[
|
||||
metadata_path.find("/") : metadata_path.rfind("/") + 1
|
||||
@ -84,7 +82,7 @@ def test_usage(cluster, node_name):
|
||||
result = node2.query("SELECT * FROM test{} settings max_threads=20".format(i))
|
||||
|
||||
result = node2.query("SELECT count() FROM test{}".format(i))
|
||||
assert int(result) == 500000 * (i + 1)
|
||||
assert int(result) == 5000000 * (i + 1)
|
||||
|
||||
result = node2.query(
|
||||
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
|
||||
@ -123,7 +121,7 @@ def test_incorrect_usage(cluster):
|
||||
)
|
||||
|
||||
result = node2.query("SELECT count() FROM test0")
|
||||
assert int(result) == 500000
|
||||
assert int(result) == 5000000
|
||||
|
||||
result = node2.query_and_get_error("ALTER TABLE test0 ADD COLUMN col1 Int32 first")
|
||||
assert "Table is read-only" in result
|
||||
@ -169,7 +167,7 @@ def test_cache(cluster, node_name):
|
||||
assert int(result) > 0
|
||||
|
||||
result = node2.query("SELECT count() FROM test{}".format(i))
|
||||
assert int(result) == 500000 * (i + 1)
|
||||
assert int(result) == 5000000 * (i + 1)
|
||||
|
||||
result = node2.query(
|
||||
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
|
||||
|
@ -3,6 +3,12 @@ import time
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
# FIXME Test is temporarily disabled due to flakyness
|
||||
# https://github.com/ClickHouse/ClickHouse/issues/39700
|
||||
|
||||
pytestmark = pytest.mark.skip
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.utility import generate_values, replace_config, SafeThread
|
||||
|
||||
|
@ -110,6 +110,10 @@ def started_cluster():
|
||||
main_configs=["configs/defaultS3.xml"],
|
||||
user_configs=["configs/s3_max_redirects.xml"],
|
||||
)
|
||||
cluster.add_instance(
|
||||
"s3_non_default",
|
||||
with_minio=True,
|
||||
)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
@ -1689,3 +1693,22 @@ def test_schema_inference_cache(started_cluster):
|
||||
|
||||
test("s3")
|
||||
test("url")
|
||||
|
||||
|
||||
def test_ast_auth_headers(started_cluster):
|
||||
bucket = started_cluster.minio_restricted_bucket
|
||||
instance = started_cluster.instances["s3_non_default"] # type: ClickHouseInstance
|
||||
filename = "test.csv"
|
||||
|
||||
result = instance.query_and_get_error(
|
||||
f"select count() from s3('http://resolver:8080/{bucket}/{filename}', 'CSV')"
|
||||
)
|
||||
|
||||
assert "Forbidden Error" in result
|
||||
assert "S3_ERROR" in result
|
||||
|
||||
result = instance.query(
|
||||
f"select * from s3('http://resolver:8080/{bucket}/{filename}', 'CSV', headers(Authorization=`Bearer TOKEN`))"
|
||||
)
|
||||
|
||||
assert result.strip() == "1\t2\t3"
|
||||
|
Loading…
Reference in New Issue
Block a user