Merge branch 'master' into cache-config-allow-readable-format-settings

This commit is contained in:
Kseniia Sumarokova 2022-09-23 15:40:07 +02:00 committed by GitHub
commit b205824670
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 815 additions and 202 deletions

View File

@ -163,7 +163,7 @@
* Fix `base58Encode / base58Decode` handling leading 0 / '1'. [#40620](https://github.com/ClickHouse/ClickHouse/pull/40620) ([Andrey Zvonov](https://github.com/zvonand)). * Fix `base58Encode / base58Decode` handling leading 0 / '1'. [#40620](https://github.com/ClickHouse/ClickHouse/pull/40620) ([Andrey Zvonov](https://github.com/zvonand)).
* keeper-fix: fix race in accessing logs while snapshot is being installed. [#40627](https://github.com/ClickHouse/ClickHouse/pull/40627) ([Antonio Andelic](https://github.com/antonio2368)). * keeper-fix: fix race in accessing logs while snapshot is being installed. [#40627](https://github.com/ClickHouse/ClickHouse/pull/40627) ([Antonio Andelic](https://github.com/antonio2368)).
* Fix short circuit execution of toFixedString function. Solves (partially) [#40622](https://github.com/ClickHouse/ClickHouse/issues/40622). [#40628](https://github.com/ClickHouse/ClickHouse/pull/40628) ([Kruglov Pavel](https://github.com/Avogar)). * Fix short circuit execution of toFixedString function. Solves (partially) [#40622](https://github.com/ClickHouse/ClickHouse/issues/40622). [#40628](https://github.com/ClickHouse/ClickHouse/pull/40628) ([Kruglov Pavel](https://github.com/Avogar)).
* - Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)). * Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)).
* Fix stack overflow in recursive `Buffer` tables. This closes [#40637](https://github.com/ClickHouse/ClickHouse/issues/40637). [#40643](https://github.com/ClickHouse/ClickHouse/pull/40643) ([Alexey Milovidov](https://github.com/alexey-milovidov)). * Fix stack overflow in recursive `Buffer` tables. This closes [#40637](https://github.com/ClickHouse/ClickHouse/issues/40637). [#40643](https://github.com/ClickHouse/ClickHouse/pull/40643) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* During insertion of a new query to the `ProcessList` allocations happen. If we reach the memory limit during these allocations we can not use `OvercommitTracker`, because `ProcessList::mutex` is already acquired. Fixes [#40611](https://github.com/ClickHouse/ClickHouse/issues/40611). [#40677](https://github.com/ClickHouse/ClickHouse/pull/40677) ([Dmitry Novik](https://github.com/novikd)). * During insertion of a new query to the `ProcessList` allocations happen. If we reach the memory limit during these allocations we can not use `OvercommitTracker`, because `ProcessList::mutex` is already acquired. Fixes [#40611](https://github.com/ClickHouse/ClickHouse/issues/40611). [#40677](https://github.com/ClickHouse/ClickHouse/pull/40677) ([Dmitry Novik](https://github.com/novikd)).
* Fix LOGICAL_ERROR with max_read_buffer_size=0 during reading marks. [#40705](https://github.com/ClickHouse/ClickHouse/pull/40705) ([Azat Khuzhin](https://github.com/azat)). * Fix LOGICAL_ERROR with max_read_buffer_size=0 during reading marks. [#40705](https://github.com/ClickHouse/ClickHouse/pull/40705) ([Azat Khuzhin](https://github.com/azat)).
@ -174,7 +174,7 @@
* In [#40595](https://github.com/ClickHouse/ClickHouse/issues/40595) it was reported that the `host_regexp` functionality was not working properly with a name to address resolution in `/etc/hosts`. It's fixed. [#40769](https://github.com/ClickHouse/ClickHouse/pull/40769) ([Arthur Passos](https://github.com/arthurpassos)). * In [#40595](https://github.com/ClickHouse/ClickHouse/issues/40595) it was reported that the `host_regexp` functionality was not working properly with a name to address resolution in `/etc/hosts`. It's fixed. [#40769](https://github.com/ClickHouse/ClickHouse/pull/40769) ([Arthur Passos](https://github.com/arthurpassos)).
* Fix incremental backups for Log family. [#40827](https://github.com/ClickHouse/ClickHouse/pull/40827) ([Vitaly Baranov](https://github.com/vitlibar)). * Fix incremental backups for Log family. [#40827](https://github.com/ClickHouse/ClickHouse/pull/40827) ([Vitaly Baranov](https://github.com/vitlibar)).
* Fix extremely rare bug which can lead to potential data loss in zero-copy replication. [#40844](https://github.com/ClickHouse/ClickHouse/pull/40844) ([alesapin](https://github.com/alesapin)). * Fix extremely rare bug which can lead to potential data loss in zero-copy replication. [#40844](https://github.com/ClickHouse/ClickHouse/pull/40844) ([alesapin](https://github.com/alesapin)).
* - Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)). * Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)).
* Fix nested JSON Objects schema inference. [#40851](https://github.com/ClickHouse/ClickHouse/pull/40851) ([Kruglov Pavel](https://github.com/Avogar)). * Fix nested JSON Objects schema inference. [#40851](https://github.com/ClickHouse/ClickHouse/pull/40851) ([Kruglov Pavel](https://github.com/Avogar)).
* Fix 3-digit prefix directory for filesystem cache files not being deleted if empty. Closes [#40797](https://github.com/ClickHouse/ClickHouse/issues/40797). [#40867](https://github.com/ClickHouse/ClickHouse/pull/40867) ([Kseniia Sumarokova](https://github.com/kssenii)). * Fix 3-digit prefix directory for filesystem cache files not being deleted if empty. Closes [#40797](https://github.com/ClickHouse/ClickHouse/issues/40797). [#40867](https://github.com/ClickHouse/ClickHouse/pull/40867) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Fix uncaught DNS_ERROR on failed connection to replicas. [#40881](https://github.com/ClickHouse/ClickHouse/pull/40881) ([Robert Coelho](https://github.com/coelho)). * Fix uncaught DNS_ERROR on failed connection to replicas. [#40881](https://github.com/ClickHouse/ClickHouse/pull/40881) ([Robert Coelho](https://github.com/coelho)).

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 9fec8e11dbb6a352e1cfba8cc9e23ebd7fb77310 Subproject commit 76746b35d0e254eaaba71dc3b79e46cba8cbb144

View File

@ -106,8 +106,8 @@ fi
if [ -n "$(ls /docker-entrypoint-initdb.d/)" ] || [ -n "$CLICKHOUSE_DB" ]; then if [ -n "$(ls /docker-entrypoint-initdb.d/)" ] || [ -n "$CLICKHOUSE_DB" ]; then
# port is needed to check if clickhouse-server is ready for connections # port is needed to check if clickhouse-server is ready for connections
HTTP_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=http_port)" HTTP_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=http_port --try)"
HTTPS_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=https_port)" HTTPS_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=https_port --try)"
if [ -n "$HTTP_PORT" ]; then if [ -n "$HTTP_PORT" ]; then
URL="http://127.0.0.1:$HTTP_PORT/ping" URL="http://127.0.0.1:$HTTP_PORT/ping"

View File

@ -462,8 +462,9 @@
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path> <tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<!-- Disable AuthType plaintext_password and no_password for ACL. --> <!-- Disable AuthType plaintext_password and no_password for ACL. -->
<!-- <allow_plaintext_password>0</allow_plaintext_password> --> <allow_plaintext_password>1</allow_plaintext_password>
<!-- <allow_no_password>0</allow_no_password> -->` <allow_no_password>1</allow_no_password>
<allow_implicit_no_password>1</allow_implicit_no_password>
<!-- Policy from the <storage_configuration> for the temporary files. <!-- Policy from the <storage_configuration> for the temporary files.
If not set <tmp_path> is used, otherwise <tmp_path> is ignored. If not set <tmp_path> is used, otherwise <tmp_path> is ignored.

View File

@ -162,6 +162,7 @@ void AccessControl::setUpFromMainConfig(const Poco::Util::AbstractConfiguration
if (config_.has("custom_settings_prefixes")) if (config_.has("custom_settings_prefixes"))
setCustomSettingsPrefixes(config_.getString("custom_settings_prefixes")); setCustomSettingsPrefixes(config_.getString("custom_settings_prefixes"));
setImplicitNoPasswordAllowed(config_.getBool("allow_implicit_no_password", true));
setNoPasswordAllowed(config_.getBool("allow_no_password", true)); setNoPasswordAllowed(config_.getBool("allow_no_password", true));
setPlaintextPasswordAllowed(config_.getBool("allow_plaintext_password", true)); setPlaintextPasswordAllowed(config_.getBool("allow_plaintext_password", true));
@ -499,6 +500,15 @@ void AccessControl::checkSettingNameIsAllowed(const std::string_view setting_nam
custom_settings_prefixes->checkSettingNameIsAllowed(setting_name); custom_settings_prefixes->checkSettingNameIsAllowed(setting_name);
} }
void AccessControl::setImplicitNoPasswordAllowed(bool allow_implicit_no_password_)
{
allow_implicit_no_password = allow_implicit_no_password_;
}
bool AccessControl::isImplicitNoPasswordAllowed() const
{
return allow_implicit_no_password;
}
void AccessControl::setNoPasswordAllowed(bool allow_no_password_) void AccessControl::setNoPasswordAllowed(bool allow_no_password_)
{ {

View File

@ -134,6 +134,11 @@ public:
bool isSettingNameAllowed(const std::string_view name) const; bool isSettingNameAllowed(const std::string_view name) const;
void checkSettingNameIsAllowed(const std::string_view name) const; void checkSettingNameIsAllowed(const std::string_view name) const;
/// Allows implicit user creation without password (by default it's allowed).
/// In other words, allow 'CREATE USER' queries without 'IDENTIFIED WITH' clause.
void setImplicitNoPasswordAllowed(const bool allow_implicit_no_password_);
bool isImplicitNoPasswordAllowed() const;
/// Allows users without password (by default it's allowed). /// Allows users without password (by default it's allowed).
void setNoPasswordAllowed(const bool allow_no_password_); void setNoPasswordAllowed(const bool allow_no_password_);
bool isNoPasswordAllowed() const; bool isNoPasswordAllowed() const;
@ -222,6 +227,7 @@ private:
std::unique_ptr<AccessChangesNotifier> changes_notifier; std::unique_ptr<AccessChangesNotifier> changes_notifier;
std::atomic_bool allow_plaintext_password = true; std::atomic_bool allow_plaintext_password = true;
std::atomic_bool allow_no_password = true; std::atomic_bool allow_no_password = true;
std::atomic_bool allow_implicit_no_password = true;
std::atomic_bool users_without_row_policies_can_read_rows = false; std::atomic_bool users_without_row_policies_can_read_rows = false;
std::atomic_bool on_cluster_queries_require_cluster_grant = false; std::atomic_bool on_cluster_queries_require_cluster_grant = false;
std::atomic_bool select_from_system_db_requires_grant = false; std::atomic_bool select_from_system_db_requires_grant = false;

View File

@ -7,11 +7,15 @@
#include <base/unaligned.h> #include <base/unaligned.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Common/TargetSpecific.h>
#include <Core/TypeId.h> #include <Core/TypeId.h>
#include <base/TypeName.h> #include <base/TypeName.h>
#include "config_core.h" #include "config_core.h"
#if USE_MULTITARGET_CODE
# include <immintrin.h>
#endif
namespace DB namespace DB
{ {
@ -391,6 +395,124 @@ protected:
Container data; Container data;
}; };
DECLARE_DEFAULT_CODE(
template <typename Container, typename Type>
inline void vectorIndexImpl(const Container & data, const PaddedPODArray<Type> & indexes, size_t limit, Container & res_data)
{
for (size_t i = 0; i < limit; ++i)
res_data[i] = data[indexes[i]];
}
);
DECLARE_AVX512VBMI_SPECIFIC_CODE(
template <typename Container, typename Type>
inline void vectorIndexImpl(const Container & data, const PaddedPODArray<Type> & indexes, size_t limit, Container & res_data)
{
static constexpr UInt64 MASK64 = 0xffffffffffffffff;
const size_t limit64 = limit & ~63;
size_t pos = 0;
size_t data_size = data.size();
auto data_pos = reinterpret_cast<const UInt8 *>(data.data());
auto indexes_pos = reinterpret_cast<const UInt8 *>(indexes.data());
auto res_pos = reinterpret_cast<UInt8 *>(res_data.data());
if (data_size <= 64)
{
/// one single mask load for table size <= 64
__mmask64 last_mask = MASK64 >> (64 - data_size);
__m512i table1 = _mm512_maskz_loadu_epi8(last_mask, data_pos);
/// 64 bytes table lookup using one single permutexvar_epi8
while (pos < limit64)
{
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
__m512i out = _mm512_permutexvar_epi8(vidx, table1);
_mm512_storeu_epi8(res_pos + pos, out);
pos += 64;
}
/// tail handling
if (limit > limit64)
{
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
__m512i out = _mm512_permutexvar_epi8(vidx, table1);
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
}
}
else if (data_size <= 128)
{
/// table size (64, 128] requires 2 zmm load
__mmask64 last_mask = MASK64 >> (128 - data_size);
__m512i table1 = _mm512_loadu_epi8(data_pos);
__m512i table2 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 64);
/// 128 bytes table lookup using one single permute2xvar_epi8
while (pos < limit64)
{
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
__m512i out = _mm512_permutex2var_epi8(table1, vidx, table2);
_mm512_storeu_epi8(res_pos + pos, out);
pos += 64;
}
if (limit > limit64)
{
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
__m512i out = _mm512_permutex2var_epi8(table1, vidx, table2);
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
}
}
else
{
if (data_size > 256)
{
/// byte index will not exceed 256 boundary.
data_size = 256;
}
__m512i table1 = _mm512_loadu_epi8(data_pos);
__m512i table2 = _mm512_loadu_epi8(data_pos + 64);
__m512i table3, table4;
if (data_size <= 192)
{
/// only 3 tables need to load if size <= 192
__mmask64 last_mask = MASK64 >> (192 - data_size);
table3 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 128);
table4 = _mm512_setzero_si512();
}
else
{
__mmask64 last_mask = MASK64 >> (256 - data_size);
table3 = _mm512_loadu_epi8(data_pos + 128);
table4 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 192);
}
/// 256 bytes table lookup can use: 2 permute2xvar_epi8 plus 1 blender with MSB
while (pos < limit64)
{
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
__m512i tmp1 = _mm512_permutex2var_epi8(table1, vidx, table2);
__m512i tmp2 = _mm512_permutex2var_epi8(table3, vidx, table4);
__mmask64 msb = _mm512_movepi8_mask(vidx);
__m512i out = _mm512_mask_blend_epi8(msb, tmp1, tmp2);
_mm512_storeu_epi8(res_pos + pos, out);
pos += 64;
}
if (limit > limit64)
{
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
__m512i tmp1 = _mm512_permutex2var_epi8(table1, vidx, table2);
__m512i tmp2 = _mm512_permutex2var_epi8(table3, vidx, table4);
__mmask64 msb = _mm512_movepi8_mask(vidx);
__m512i out = _mm512_mask_blend_epi8(msb, tmp1, tmp2);
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
}
}
}
);
template <typename T> template <typename T>
template <typename Type> template <typename Type>
ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
@ -399,8 +521,18 @@ ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_
auto res = this->create(limit); auto res = this->create(limit);
typename Self::Container & res_data = res->getData(); typename Self::Container & res_data = res->getData();
for (size_t i = 0; i < limit; ++i) #if USE_MULTITARGET_CODE
res_data[i] = data[indexes[i]]; if constexpr (sizeof(T) == 1 && sizeof(Type) == 1)
{
/// VBMI optimization only applicable for (U)Int8 types
if (isArchSupported(TargetArch::AVX512VBMI))
{
TargetSpecific::AVX512VBMI::vectorIndexImpl<Container, Type>(data, indexes, limit, res_data);
return res;
}
}
#endif
TargetSpecific::Default::vectorIndexImpl<Container, Type>(data, indexes, limit, res_data);
return res; return res;
} }

View File

@ -1,10 +1,10 @@
#include <limits>
#include <typeinfo> #include <typeinfo>
#include <vector> #include <vector>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
using namespace DB; using namespace DB;
static pcg64 rng(randomSeed()); static pcg64 rng(randomSeed());
@ -76,7 +76,6 @@ static void testFilter()
} }
} }
TEST(ColumnVector, Filter) TEST(ColumnVector, Filter)
{ {
testFilter<UInt8>(); testFilter<UInt8>();
@ -89,3 +88,71 @@ TEST(ColumnVector, Filter)
testFilter<Float64>(); testFilter<Float64>();
testFilter<UUID>(); testFilter<UUID>();
} }
template <typename T>
static MutableColumnPtr createIndexColumn(size_t limit, size_t rows)
{
auto column = ColumnVector<T>::create();
auto & values = column->getData();
auto max = std::numeric_limits<T>::max();
limit = limit > max ? max : limit;
for (size_t i = 0; i < rows; ++i)
{
T val = rng() % limit;
values.push_back(val);
}
return column;
}
template <typename T, typename IndexType>
static void testIndex()
{
static const std::vector<size_t> column_sizes = {64, 128, 196, 256, 512};
auto test_case = [&](size_t rows, size_t index_rows, size_t limit)
{
auto vector_column = createColumn<T>(rows);
auto index_column = createIndexColumn<IndexType>(rows, index_rows);
auto res_column = vector_column->index(*index_column, limit);
if (limit == 0)
limit = index_column->size();
/// check results
if (limit != res_column->size())
throw Exception(error_code, "ColumnVector index size not match to limit: {} {}", typeid(T).name(), typeid(IndexType).name());
for (size_t i = 0; i < limit; ++i)
{
/// vector_column data is the same as index, so indexed column's value will equals to index_column.
if (res_column->get64(i) != index_column->get64(i))
throw Exception(error_code, "ColumnVector index fail: {} {}", typeid(T).name(), typeid(IndexType).name());
}
};
try
{
for (size_t i = 0; i < TEST_RUNS; ++i)
{
/// make sure rows distribute in (column_sizes[r-1], colulmn_sizes[r]]
size_t row_idx = rng() % column_sizes.size();
size_t row_base = row_idx > 0 ? column_sizes[row_idx - 1] : 0;
size_t rows = row_base + (rng() % (column_sizes[row_idx] - row_base) + 1);
size_t index_rows = rng() % MAX_ROWS + 1;
test_case(rows, index_rows, 0);
test_case(rows, index_rows, static_cast<size_t>(0.5 * index_rows));
}
}
catch (const Exception & e)
{
FAIL() << e.displayText();
}
}
TEST(ColumnVector, Index)
{
testIndex<UInt8, UInt8>();
testIndex<UInt16, UInt8>();
testIndex<UInt16, UInt16>();
}

View File

@ -0,0 +1,48 @@
#include <Common/EventNotifier.h>
#include <Common/Exception.h>
#include <boost/functional/hash.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::unique_ptr<EventNotifier> EventNotifier::event_notifier;
EventNotifier & EventNotifier::init()
{
if (event_notifier)
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is initialized twice. This is a bug.");
event_notifier = std::make_unique<EventNotifier>();
return *event_notifier;
}
EventNotifier & EventNotifier::instance()
{
if (!event_notifier)
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is not initialized. This is a bug.");
return *event_notifier;
}
void EventNotifier::shutdown()
{
if (event_notifier)
event_notifier.reset();
}
size_t EventNotifier::calculateIdentifier(size_t a, size_t b)
{
size_t result = 0;
boost::hash_combine(result, a);
boost::hash_combine(result, b);
return result;
}
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <vector>
#include <mutex>
#include <functional>
#include <set>
#include <map>
#include <memory>
#include <utility>
#include <iostream>
#include <base/types.h>
#include <Common/HashTable/Hash.h>
namespace DB
{
class EventNotifier
{
public:
struct Handler
{
Handler(
EventNotifier & parent_,
size_t event_id_,
size_t callback_id_)
: parent(parent_)
, event_id(event_id_)
, callback_id(callback_id_)
{}
~Handler()
{
std::lock_guard lock(parent.mutex);
parent.callback_table[event_id].erase(callback_id);
parent.storage.erase(callback_id);
}
private:
EventNotifier & parent;
size_t event_id;
size_t callback_id;
};
using HandlerPtr = std::shared_ptr<Handler>;
static EventNotifier & init();
static EventNotifier & instance();
static void shutdown();
template <typename EventType, typename Callback>
[[ nodiscard ]] HandlerPtr subscribe(EventType event, Callback && callback)
{
std::lock_guard lock(mutex);
auto event_id = DefaultHash64(event);
auto callback_id = calculateIdentifier(event_id, ++counter);
callback_table[event_id].insert(callback_id);
storage[callback_id] = std::forward<Callback>(callback);
return std::make_shared<Handler>(*this, event_id, callback_id);
}
template <typename EventType>
void notify(EventType event)
{
std::lock_guard lock(mutex);
for (const auto & identifier : callback_table[DefaultHash64(event)])
storage[identifier]();
}
private:
// To move boost include for .h file
static size_t calculateIdentifier(size_t a, size_t b);
using CallbackType = std::function<void()>;
using CallbackStorage = std::map<size_t, CallbackType>;
using EventToCallbacks = std::map<size_t, std::set<size_t>>;
std::mutex mutex;
EventToCallbacks callback_table;
CallbackStorage storage;
size_t counter{0};
static std::unique_ptr<EventNotifier> event_notifier;
};
}

View File

@ -139,12 +139,14 @@ void ZooKeeper::init(ZooKeeperArgs args_)
} }
} }
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_) ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{ {
zk_log = std::move(zk_log_);
init(args_); init(args_);
} }
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_) ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_)) : zk_log(std::move(zk_log_))
{ {

View File

@ -1,15 +1,16 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h> #include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h> #include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/EventNotifier.h>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Common/logger_useful.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <Common/config.h> #include <Common/config.h>
@ -874,7 +875,11 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
/// No new requests will appear in queue after finish() /// No new requests will appear in queue after finish()
bool was_already_finished = requests_queue.finish(); bool was_already_finished = requests_queue.finish();
if (!was_already_finished) if (!was_already_finished)
{
active_session_metric_increment.destroy(); active_session_metric_increment.destroy();
/// Notify all subscribers (ReplicatedMergeTree tables) about expired session
EventNotifier::instance().notify(Error::ZSESSIONEXPIRED);
}
}; };
try try

View File

@ -0,0 +1,53 @@
#include <gtest/gtest.h>
#include <Common/EventNotifier.h>
#include <Common/ZooKeeper/IKeeper.h>
TEST(EventNotifier, SimpleTest)
{
using namespace DB;
size_t result = 1;
EventNotifier::init();
auto handler3 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 3; });
{
auto handler5 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 5; });
}
auto handler7 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 7; });
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 21);
result = 1;
handler3.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 7);
auto handler11 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 11; });
result = 1;
handler7.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 11);
EventNotifier::HandlerPtr handler13;
{
handler13 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 13; });
}
result = 1;
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 143);
result = 1;
handler11.reset();
handler13.reset();
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
ASSERT_EQ(result, 1);
EventNotifier::shutdown();
}

View File

@ -245,8 +245,8 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
credentials.setPassword(named_collection->configuration.password); credentials.setPassword(named_collection->configuration.password);
header_entries.reserve(named_collection->configuration.headers.size()); header_entries.reserve(named_collection->configuration.headers.size());
for (const auto & header : named_collection->configuration.headers) for (const auto & [key, value] : named_collection->configuration.headers)
header_entries.emplace_back(std::make_tuple(header.first, header.second.get<String>())); header_entries.emplace_back(std::make_tuple(key, value));
} }
else else
{ {

View File

@ -2,6 +2,7 @@
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/escapeForFileName.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -53,7 +54,31 @@ const std::string & MetadataStorageFromStaticFilesWebServer::getPath() const
bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const
{ {
return object_storage.files.contains(path); fs::path fs_path(path);
if (fs_path.has_extension())
fs_path = fs_path.parent_path();
initializeIfNeeded(fs_path, false);
if (object_storage.files.empty())
return false;
if (object_storage.files.contains(path))
return true;
/// `object_storage.files` contains files + directories only inside `metadata_path / uuid_3_digit / uuid /`
/// (specific table files only), but we need to be able to also tell if `exists(<metadata_path>)`, for example.
auto it = std::lower_bound(
object_storage.files.begin(),
object_storage.files.end(),
path,
[](const auto & file, const std::string & path_) { return file.first < path_; }
);
if (startsWith(it->first, path)
|| (it != object_storage.files.begin() && startsWith(std::prev(it)->first, path)))
return true;
return false;
} }
void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const
@ -98,7 +123,10 @@ uint64_t MetadataStorageFromStaticFilesWebServer::getFileSize(const String & pat
StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const
{ {
assertExists(path); assertExists(path);
return {StoredObject::create(object_storage, path, object_storage.files.at(path).size, true)}; auto fs_path = fs::path(object_storage.url) / path;
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.substr(object_storage.url.size());
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, true)};
} }
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const
@ -112,7 +140,7 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
return result; return result;
} }
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path) const bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error) const
{ {
if (object_storage.files.find(path) == object_storage.files.end()) if (object_storage.files.find(path) == object_storage.files.end())
{ {
@ -123,7 +151,7 @@ bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::stri
catch (...) catch (...)
{ {
const auto message = getCurrentExceptionMessage(false); const auto message = getCurrentExceptionMessage(false);
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext(); bool can_throw = throw_on_error.has_value() ? *throw_on_error : CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
if (can_throw) if (can_throw)
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message); throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
@ -140,13 +168,17 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
std::vector<fs::path> dir_file_paths; std::vector<fs::path> dir_file_paths;
if (!initializeIfNeeded(path)) if (!initializeIfNeeded(path))
{
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths)); return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
}
assertExists(path); assertExists(path);
for (const auto & [file_path, _] : object_storage.files) for (const auto & [file_path, _] : object_storage.files)
if (parentPath(file_path) == path) {
if (fs::path(parentPath(file_path)) / "" == fs::path(path) / "")
dir_file_paths.emplace_back(file_path); dir_file_paths.emplace_back(file_path);
}
LOG_TRACE(object_storage.log, "Iterate directory {} with {} files", path, dir_file_paths.size()); LOG_TRACE(object_storage.log, "Iterate directory {} with {} files", path, dir_file_paths.size());
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths)); return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));

View File

@ -19,7 +19,7 @@ private:
void assertExists(const std::string & path) const; void assertExists(const std::string & path) const;
bool initializeIfNeeded(const std::string & path) const; bool initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error = std::nullopt) const;
public: public:
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_); explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);

View File

@ -30,7 +30,6 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int FILE_DOESNT_EXIST;
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
} }
@ -153,20 +152,6 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
std::optional<size_t>, std::optional<size_t>,
std::optional<size_t>) const std::optional<size_t>) const
{ {
const auto & path = object.absolute_path;
LOG_TRACE(log, "Read from path: {}", path);
auto iter = files.find(path);
if (iter == files.end())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File path {} does not exist", path);
auto fs_path = fs::path(url) / path;
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.string().substr(url.size());
StoredObjects objects;
objects.emplace_back(remote_path, iter->second.size);
auto read_buffer_creator = auto read_buffer_creator =
[this, read_settings] [this, read_settings]
(const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase> (const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
@ -179,7 +164,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
read_until_position); read_until_position);
}; };
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, read_settings); auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{ {

View File

@ -114,7 +114,7 @@ protected:
size_t size = 0; size_t size = 0;
}; };
using Files = std::unordered_map<String, FileData>; /// file path -> file data using Files = std::map<String, FileData>; /// file path -> file data
mutable Files files; mutable Files files;
String url; String url;

View File

@ -274,6 +274,8 @@ void PocoHTTPClient::makeRequestInternal(
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ true); session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ true);
} }
/// In case of error this address will be written to logs
request.SetResolvedRemoteHost(session->getResolvedAddress());
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1); Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);

View File

@ -114,6 +114,7 @@ struct URI
bool is_virtual_hosted_style; bool is_virtual_hosted_style;
explicit URI(const Poco::URI & uri_); explicit URI(const Poco::URI & uri_);
explicit URI(const std::string & uri_) : URI(Poco::URI(uri_)) {}
static void validateBucket(const String & bucket, const Poco::URI & uri); static void validateBucket(const String & bucket, const Poco::URI & uri);
}; };

View File

@ -100,9 +100,14 @@ BlockIO InterpreterCreateUserQuery::execute()
auto & access_control = getContext()->getAccessControl(); auto & access_control = getContext()->getAccessControl();
auto access = getContext()->getAccess(); auto access = getContext()->getAccess();
access->checkAccess(query.alter ? AccessType::ALTER_USER : AccessType::CREATE_USER); access->checkAccess(query.alter ? AccessType::ALTER_USER : AccessType::CREATE_USER);
bool implicit_no_password_allowed = access_control.isImplicitNoPasswordAllowed();
bool no_password_allowed = access_control.isNoPasswordAllowed(); bool no_password_allowed = access_control.isNoPasswordAllowed();
bool plaintext_password_allowed = access_control.isPlaintextPasswordAllowed(); bool plaintext_password_allowed = access_control.isPlaintextPasswordAllowed();
if (!query.attach && !query.alter && !query.auth_data && !implicit_no_password_allowed)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Authentication type NO_PASSWORD must be explicitly specified, check the setting allow_implicit_no_password in the server configuration");
std::optional<RolesOrUsersSet> default_roles_from_query; std::optional<RolesOrUsersSet> default_roles_from_query;
if (query.default_roles) if (query.default_roles)
{ {

View File

@ -8,6 +8,7 @@
#include <Poco/Util/Application.h> #include <Poco/Util/Application.h>
#include <Common/Macros.h> #include <Common/Macros.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/EventNotifier.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
@ -510,6 +511,7 @@ void Context::initGlobal()
assert(!global_context_instance); assert(!global_context_instance);
global_context_instance = shared_from_this(); global_context_instance = shared_from_this();
DatabaseCatalog::init(shared_from_this()); DatabaseCatalog::init(shared_from_this());
EventNotifier::init();
} }
SharedContextHolder Context::createShared() SharedContextHolder Context::createShared()

View File

@ -1,15 +1,20 @@
#include <Processors/Formats/InputFormatErrorsLogger.h> #include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h> #include <Common/filesystemHelpers.h>
#include <Processors/Formats/IRowOutputFormat.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int DATABASE_ACCESS_DENIED;
}
namespace namespace
{ {
const String DEFAULT_OUTPUT_FORMAT = "CSV"; const String DEFAULT_OUTPUT_FORMAT = "CSV";
@ -26,8 +31,19 @@ InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context)
database = context->getInsertionTable().getDatabaseName(); database = context->getInsertionTable().getDatabaseName();
String path_in_setting = context->getSettingsRef().input_format_record_errors_file_path; String path_in_setting = context->getSettingsRef().input_format_record_errors_file_path;
errors_file_path = context->getApplicationType() == Context::ApplicationType::SERVER ? context->getUserFilesPath() + path_in_setting
: path_in_setting; if (context->getApplicationType() == Context::ApplicationType::SERVER)
{
auto user_files_path = context->getUserFilesPath();
errors_file_path = fs::path(user_files_path) / path_in_setting;
if (!fileOrSymlinkPathStartsWith(errors_file_path, user_files_path))
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Cannot log errors in path `{}`, because it is not inside `{}`", errors_file_path, user_files_path);
}
else
{
errors_file_path = path_in_setting;
}
while (fs::exists(errors_file_path)) while (fs::exists(errors_file_path))
{ {
errors_file_path += "_new"; errors_file_path += "_new";

View File

@ -307,7 +307,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
{ {
const auto header_prefix = headers_prefix + header; const auto header_prefix = headers_prefix + header;
configuration.headers.emplace_back( configuration.headers.emplace_back(
std::make_pair(headers_config->getString(header_prefix + ".name"), headers_config->getString(header_prefix + ".value"))); headers_config->getString(header_prefix + ".name"),
headers_config->getString(header_prefix + ".value"));
} }
} }
@ -446,7 +447,9 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
for (const auto & header : header_keys) for (const auto & header : header_keys)
{ {
const auto header_prefix = config_prefix + ".headers." + header; const auto header_prefix = config_prefix + ".headers." + header;
configuration.headers.emplace_back(std::make_pair(config.getString(header_prefix + ".name"), config.getString(header_prefix + ".value"))); configuration.headers.emplace_back(
config.getString(header_prefix + ".name"),
config.getString(header_prefix + ".value"));
} }
} }
else else

View File

@ -3,6 +3,7 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Storages/StorageS3Settings.h> #include <Storages/StorageS3Settings.h>
#include <Storages/HeaderCollection.h>
namespace DB namespace DB
@ -108,7 +109,7 @@ struct URLBasedDataSourceConfiguration
String user; String user;
String password; String password;
std::vector<std::pair<String, Field>> headers; HeaderCollection headers;
String http_method; String http_method;
void set(const URLBasedDataSourceConfiguration & conf); void set(const URLBasedDataSourceConfiguration & conf);

View File

@ -0,0 +1,18 @@
#pragma once
#include <string>
namespace DB
{
struct HttpHeader
{
std::string name;
std::string value;
HttpHeader(const std::string & name_, const std::string & value_) : name(name_), value(value_) {}
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
};
using HeaderCollection = std::vector<HttpHeader>;
}

View File

@ -1467,6 +1467,8 @@ bool MutateTask::execute()
} }
case State::NEED_EXECUTE: case State::NEED_EXECUTE:
{ {
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
if (task->executeStep()) if (task->executeStep())
return true; return true;

View File

@ -494,9 +494,18 @@ void StorageKeeperMap::drop()
checkTable<true>(); checkTable<true>();
auto client = getClient(); auto client = getClient();
client->remove(table_path); // we allow ZNONODE in case we got hardware error on previous drop
if (auto code = client->tryRemove(table_path); code == Coordination::Error::ZNOTEMPTY)
{
throw zkutil::KeeperException(
code, "{} contains children which shouldn't happen. Please DETACH the table if you want to delete it", table_path);
}
if (!client->getChildren(tables_path).empty()) std::vector<std::string> children;
// if the tables_path is not found, some other table removed it
// if there are children, some other tables are still using this path as storage
if (auto code = client->tryGetChildren(tables_path, children);
code != Coordination::Error::ZOK || !children.empty())
return; return;
Coordination::Requests ops; Coordination::Requests ops;

View File

@ -4185,6 +4185,11 @@ void StorageReplicatedMergeTree::startupImpl()
/// In this thread replica will be activated. /// In this thread replica will be activated.
restarting_thread.start(); restarting_thread.start();
/// And this is just a callback
session_expired_callback_handler = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [this]()
{
restarting_thread.start();
});
/// Wait while restarting_thread finishing initialization. /// Wait while restarting_thread finishing initialization.
/// NOTE It does not mean that replication is actually started after receiving this event. /// NOTE It does not mean that replication is actually started after receiving this event.
@ -4228,6 +4233,8 @@ void StorageReplicatedMergeTree::shutdown()
if (shutdown_called.exchange(true)) if (shutdown_called.exchange(true))
return; return;
session_expired_callback_handler.reset();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever(); fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever();

View File

@ -29,6 +29,7 @@
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Throttler.h> #include <Common/Throttler.h>
#include <Common/EventNotifier.h>
#include <base/defines.h> #include <base/defines.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
@ -453,6 +454,7 @@ private:
/// A thread that processes reconnection to ZooKeeper when the session expires. /// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread; ReplicatedMergeTreeRestartingThread restarting_thread;
EventNotifier::HandlerPtr session_expired_callback_handler;
/// A thread that attaches the table using ZooKeeper /// A thread that attaches the table using ZooKeeper
std::optional<ReplicatedMergeTreeAttachThread> attach_thread; std::optional<ReplicatedMergeTreeAttachThread> attach_thread;

View File

@ -27,6 +27,7 @@
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <Storages/getVirtualsForStorage.h> #include <Storages/getVirtualsForStorage.h>
#include <Storages/checkAndGetLiteralArgument.h> #include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <IO/ReadBufferFromS3.h> #include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h> #include <IO/WriteBufferFromS3.h>
@ -767,33 +768,28 @@ private:
StorageS3::StorageS3( StorageS3::StorageS3(
const S3::URI & uri_, const StorageS3Configuration & configuration_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageID & table_id_, const StorageID & table_id_,
const String & format_name_,
const S3Settings::ReadWriteSettings & rw_settings_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
const String & comment, const String & comment,
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_, std::optional<FormatSettings> format_settings_,
const String & compression_method_,
bool distributed_processing_, bool distributed_processing_,
ASTPtr partition_by_) ASTPtr partition_by_)
: IStorage(table_id_) : IStorage(table_id_)
, s3_configuration{uri_, access_key_id_, secret_access_key_, {}, {}, rw_settings_} /// Client and settings will be updated later , s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, keys({uri_.key}) , keys({s3_configuration.uri.key})
, format_name(format_name_) , format_name(configuration_.format)
, compression_method(compression_method_) , compression_method(configuration_.compression_method)
, name(uri_.storage_name) , name(s3_configuration.uri.storage_name)
, distributed_processing(distributed_processing_) , distributed_processing(distributed_processing_)
, format_settings(format_settings_) , format_settings(format_settings_)
, partition_by(partition_by_) , partition_by(partition_by_)
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos) , is_key_with_globs(s3_configuration.uri.key.find_first_of("*?{") != std::string::npos)
{ {
FormatFactory::instance().checkFormatName(format_name); FormatFactory::instance().checkFormatName(format_name);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri); context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.uri.uri);
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
updateS3Configuration(context_, s3_configuration); updateS3Configuration(context_, s3_configuration);
@ -1062,47 +1058,48 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd) void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
{ {
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString()); auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
const auto & config_rw_settings = settings.rw_settings;
bool need_update_configuration = settings != S3Settings{}; if (upd.rw_settings != config_rw_settings)
if (need_update_configuration) upd.rw_settings = settings.rw_settings;
{
if (upd.rw_settings != settings.rw_settings)
upd.rw_settings = settings.rw_settings;
}
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings()); upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
if (upd.client && (!upd.access_key_id.empty() || settings.auth_settings == upd.auth_settings)) if (upd.client)
return;
Aws::Auth::AWSCredentials credentials(upd.access_key_id, upd.secret_access_key);
HeaderCollection headers;
if (upd.access_key_id.empty())
{ {
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key); if (upd.static_configuration)
headers = settings.auth_settings.headers; return;
if (settings.auth_settings == upd.auth_settings)
return;
} }
upd.auth_settings.updateFrom(settings.auth_settings);
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration( S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings.region, upd.auth_settings.region,
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects, ctx->getRemoteHostFilter(),
ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging, ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false); /* for_disk_s3 = */ false);
client_configuration.endpointOverride = upd.uri.endpoint; client_configuration.endpointOverride = upd.uri.endpoint;
client_configuration.maxConnections = upd.rw_settings.max_connections; client_configuration.maxConnections = upd.rw_settings.max_connections;
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
auto headers = upd.auth_settings.headers;
if (!upd.headers_from_ast.empty())
headers.insert(headers.end(), upd.headers_from_ast.begin(), upd.headers_from_ast.end());
upd.client = S3::ClientFactory::instance().create( upd.client = S3::ClientFactory::instance().create(
client_configuration, client_configuration,
upd.uri.is_virtual_hosted_style, upd.uri.is_virtual_hosted_style,
credentials.GetAWSAccessKeyId(), credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(), credentials.GetAWSSecretKey(),
settings.auth_settings.server_side_encryption_customer_key_base64, upd.auth_settings.server_side_encryption_customer_key_base64,
std::move(headers), std::move(headers),
settings.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)), upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
settings.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false))); upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
upd.auth_settings = std::move(settings.auth_settings);
} }
@ -1155,6 +1152,10 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].", "Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto header_it = StorageURL::collectHeaders(engine_args, configuration, local_context);
if (header_it != engine_args.end())
engine_args.erase(header_it);
for (auto & engine_arg : engine_args) for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
@ -1184,19 +1185,23 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
} }
ColumnsDescription StorageS3::getTableStructureFromData( ColumnsDescription StorageS3::getTableStructureFromData(
const String & format, const StorageS3Configuration & configuration,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & compression_method,
bool distributed_processing, bool distributed_processing,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos) std::unordered_map<String, S3::ObjectInfo> * object_infos)
{ {
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) }; S3Configuration s3_configuration{
configuration.url,
configuration.auth_settings,
S3Settings::ReadWriteSettings(ctx->getSettingsRef()),
configuration.headers};
updateS3Configuration(ctx, s3_configuration); updateS3Configuration(ctx, s3_configuration);
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
return getTableStructureFromDataImpl(
configuration.format, s3_configuration, configuration.compression_method, distributed_processing,
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
} }
ColumnsDescription StorageS3::getTableStructureFromDataImpl( ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -1308,25 +1313,18 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
format_settings = getFormatSettings(args.getContext()); format_settings = getFormatSettings(args.getContext());
} }
S3::URI s3_uri(Poco::URI(configuration.url));
ASTPtr partition_by; ASTPtr partition_by;
if (args.storage_def->partition_by) if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone(); partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3>( return std::make_shared<StorageS3>(
s3_uri, configuration,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
args.table_id, args.table_id,
configuration.format,
configuration.rw_settings,
args.columns, args.columns,
args.constraints, args.constraints,
args.comment, args.comment,
args.getContext(), args.getContext(),
format_settings, format_settings,
configuration.compression_method,
/* distributed_processing_ */false, /* distributed_processing_ */false,
partition_by); partition_by);
}, },

View File

@ -149,18 +149,13 @@ class StorageS3 : public IStorage, WithContext
{ {
public: public:
StorageS3( StorageS3(
const S3::URI & uri, const StorageS3Configuration & configuration_,
const String & access_key_id,
const String & secret_access_key,
const StorageID & table_id_, const StorageID & table_id_,
const String & format_name_,
const S3Settings::ReadWriteSettings & rw_settings_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
const String & comment, const String & comment,
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_, std::optional<FormatSettings> format_settings_,
const String & compression_method_ = "",
bool distributed_processing_ = false, bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr); ASTPtr partition_by_ = nullptr);
@ -189,11 +184,7 @@ public:
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context); static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static ColumnsDescription getTableStructureFromData( static ColumnsDescription getTableStructureFromData(
const String & format, const StorageS3Configuration & configuration,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & compression_method,
bool distributed_processing, bool distributed_processing,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx,
@ -204,11 +195,28 @@ public:
struct S3Configuration struct S3Configuration
{ {
const S3::URI uri; const S3::URI uri;
const String access_key_id;
const String secret_access_key;
std::shared_ptr<const Aws::S3::S3Client> client; std::shared_ptr<const Aws::S3::S3Client> client;
S3Settings::AuthSettings auth_settings; S3Settings::AuthSettings auth_settings;
S3Settings::ReadWriteSettings rw_settings; S3Settings::ReadWriteSettings rw_settings;
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HeaderCollection headers_from_ast;
S3Configuration(
const String & url_,
const S3Settings::AuthSettings & auth_settings_,
const S3Settings::ReadWriteSettings & rw_settings_,
const HeaderCollection & headers_from_ast_)
: uri(S3::URI(url_))
, auth_settings(auth_settings_)
, rw_settings(rw_settings_)
, static_configuration(!auth_settings_.access_key_id.empty())
, headers_from_ast(headers_from_ast_) {}
}; };
static SchemaCache & getSchemaCache(const ContextPtr & ctx); static SchemaCache & getSchemaCache(const ContextPtr & ctx);

View File

@ -46,22 +46,17 @@
namespace DB namespace DB
{ {
StorageS3Cluster::StorageS3Cluster( StorageS3Cluster::StorageS3Cluster(
const String & filename_, const StorageS3ClusterConfiguration & configuration_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageID & table_id_, const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
ContextPtr context_, ContextPtr context_)
const String & compression_method_)
: IStorage(table_id_) : IStorage(table_id_)
, s3_configuration{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, {}, {}, S3Settings::ReadWriteSettings(context_->getSettingsRef())} , s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, filename(filename_) , filename(configuration_.url)
, cluster_name(cluster_name_) , cluster_name(configuration_.cluster_name)
, format_name(format_name_) , format_name(configuration_.format)
, compression_method(compression_method_) , compression_method(configuration_.compression_method)
{ {
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename}); context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;

View File

@ -21,16 +21,11 @@ class StorageS3Cluster : public IStorage
{ {
public: public:
StorageS3Cluster( StorageS3Cluster(
const String & filename_, const StorageS3ClusterConfiguration & configuration_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageID & table_id_, const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
ContextPtr context_, ContextPtr context_);
const String & compression_method_);
std::string getName() const override { return "S3Cluster"; } std::string getName() const override { return "S3Cluster"; }

View File

@ -7,6 +7,7 @@
#include <vector> #include <vector>
#include <base/types.h> #include <base/types.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Storages/HeaderCollection.h>
namespace Poco::Util namespace Poco::Util
{ {
@ -15,15 +16,6 @@ class AbstractConfiguration;
namespace DB namespace DB
{ {
struct HttpHeader
{
String name;
String value;
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
};
using HeaderCollection = std::vector<HttpHeader>;
struct Settings; struct Settings;
@ -50,6 +42,21 @@ struct S3Settings
&& use_environment_credentials == other.use_environment_credentials && use_environment_credentials == other.use_environment_credentials
&& use_insecure_imds_request == other.use_insecure_imds_request; && use_insecure_imds_request == other.use_insecure_imds_request;
} }
void updateFrom(const AuthSettings & from)
{
/// Update with check for emptyness only parameters which
/// can be passed not only from config, but via ast.
if (!from.access_key_id.empty())
access_key_id = from.access_key_id;
if (!from.secret_access_key.empty())
secret_access_key = from.secret_access_key;
headers = from.headers;
region = from.region;
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
}
}; };
struct ReadWriteSettings struct ReadWriteSettings
@ -94,7 +101,6 @@ struct S3Settings
class StorageS3Settings class StorageS3Settings
{ {
public: public:
StorageS3Settings() = default;
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings); void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings);
S3Settings getSettings(const String & endpoint) const; S3Settings getSettings(const String & endpoint) const;

View File

@ -1018,7 +1018,7 @@ ASTs::iterator StorageURL::collectHeaders(
if (arg_value.getType() != Field::Types::Which::String) if (arg_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
configuration.headers.emplace_back(arg_name, arg_value); configuration.headers.emplace_back(arg_name, arg_value.safeGet<String>());
} }
headers_it = arg_it; headers_it = arg_it;
@ -1096,10 +1096,9 @@ void registerStorageURL(StorageFactory & factory)
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers; ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers) for (const auto & [header, value] : configuration.headers)
{ {
auto value_literal = value.safeGet<String>();
if (header == "Range") if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal)); headers.emplace_back(header, value);
} }
ASTPtr partition_by; ASTPtr partition_by;

View File

@ -12,6 +12,7 @@
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Storages/checkAndGetLiteralArgument.h> #include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h> #include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include "registerTableFunctions.h" #include "registerTableFunctions.h"
#include <filesystem> #include <filesystem>
@ -40,6 +41,10 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
if (args.empty() || args.size() > 6) if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message); throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
auto header_it = StorageURL::collectHeaders(args, s3_configuration, context);
if (header_it != args.end())
args.erase(header_it);
for (auto & arg : args) for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
@ -135,15 +140,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
if (configuration.structure == "auto") if (configuration.structure == "auto")
{ {
context->checkAccess(getSourceAccessType()); context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData( return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration.compression_method,
false,
std::nullopt,
context);
} }
return parseColumnsListFromString(configuration.structure, context); return parseColumnsListFromString(configuration.structure, context);
@ -161,19 +158,14 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
columns = structure_hint; columns = structure_hint;
StoragePtr storage = std::make_shared<StorageS3>( StoragePtr storage = std::make_shared<StorageS3>(
s3_uri, configuration,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
configuration.format,
configuration.rw_settings,
columns, columns,
ConstraintsDescription{}, ConstraintsDescription{},
String{}, String{},
context, context,
/// No format_settings for table function S3 /// No format_settings for table function S3
std::nullopt, std::nullopt);
configuration.compression_method);
storage->startup(); storage->startup();

View File

@ -82,19 +82,10 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const
{ {
context->checkAccess(getSourceAccessType());
if (configuration.structure == "auto") if (configuration.structure == "auto")
{ return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration.compression_method,
false,
std::nullopt,
context);
}
return parseColumnsListFromString(configuration.structure, context); return parseColumnsListFromString(configuration.structure, context);
} }
@ -104,46 +95,38 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
const std::string & table_name, ColumnsDescription /*cached_columns*/) const const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{ {
StoragePtr storage; StoragePtr storage;
ColumnsDescription columns; ColumnsDescription columns;
if (configuration.structure != "auto") if (configuration.structure != "auto")
{
columns = parseColumnsListFromString(configuration.structure, context); columns = parseColumnsListFromString(configuration.structure, context);
}
else if (!structure_hint.empty()) else if (!structure_hint.empty())
{
columns = structure_hint; columns = structure_hint;
}
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
/// On worker node this filename won't contains globs /// On worker node this filename won't contains globs
Poco::URI uri (configuration.url);
S3::URI s3_uri (uri);
storage = std::make_shared<StorageS3>( storage = std::make_shared<StorageS3>(
s3_uri, configuration,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
configuration.format,
configuration.rw_settings,
columns, columns,
ConstraintsDescription{}, ConstraintsDescription{},
String{}, /* comment */String{},
context, context,
// No format_settings for S3Cluster /* format_settings */std::nullopt, /// No format_settings for S3Cluster
std::nullopt,
configuration.compression_method,
/*distributed_processing=*/true); /*distributed_processing=*/true);
} }
else else
{ {
storage = std::make_shared<StorageS3Cluster>( storage = std::make_shared<StorageS3Cluster>(
configuration.url, configuration,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
configuration.cluster_name, configuration.format,
columns, columns,
ConstraintsDescription{}, ConstraintsDescription{},
context, context);
configuration.compression_method);
} }
storage->startup(); storage->startup();

View File

@ -103,10 +103,9 @@ ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers; ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers) for (const auto & [header, value] : configuration.headers)
{ {
auto value_literal = value.safeGet<String>();
if (header == "Range") if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed"); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal)); headers.emplace_back(header, value);
} }
return headers; return headers;
} }

View File

@ -25,19 +25,17 @@ def cluster():
global uuids global uuids
for i in range(3): for i in range(3):
node1.query( node1.query(
""" CREATE TABLE data{} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def';""".format( f"CREATE TABLE data{i} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def', min_bytes_for_wide_part=1;"
i
)
) )
node1.query(
"INSERT INTO data{} SELECT number FROM numbers(500000 * {})".format( for _ in range(10):
i, i + 1 node1.query(
f"INSERT INTO data{i} SELECT number FROM numbers(500000 * {i+1})"
) )
) expected = node1.query(f"SELECT * FROM data{i} ORDER BY id")
expected = node1.query("SELECT * FROM data{} ORDER BY id".format(i))
metadata_path = node1.query( metadata_path = node1.query(
"SELECT data_paths FROM system.tables WHERE name='data{}'".format(i) f"SELECT data_paths FROM system.tables WHERE name='data{i}'"
) )
metadata_path = metadata_path[ metadata_path = metadata_path[
metadata_path.find("/") : metadata_path.rfind("/") + 1 metadata_path.find("/") : metadata_path.rfind("/") + 1
@ -84,7 +82,7 @@ def test_usage(cluster, node_name):
result = node2.query("SELECT * FROM test{} settings max_threads=20".format(i)) result = node2.query("SELECT * FROM test{} settings max_threads=20".format(i))
result = node2.query("SELECT count() FROM test{}".format(i)) result = node2.query("SELECT count() FROM test{}".format(i))
assert int(result) == 500000 * (i + 1) assert int(result) == 5000000 * (i + 1)
result = node2.query( result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i) "SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
@ -123,7 +121,7 @@ def test_incorrect_usage(cluster):
) )
result = node2.query("SELECT count() FROM test0") result = node2.query("SELECT count() FROM test0")
assert int(result) == 500000 assert int(result) == 5000000
result = node2.query_and_get_error("ALTER TABLE test0 ADD COLUMN col1 Int32 first") result = node2.query_and_get_error("ALTER TABLE test0 ADD COLUMN col1 Int32 first")
assert "Table is read-only" in result assert "Table is read-only" in result
@ -169,7 +167,7 @@ def test_cache(cluster, node_name):
assert int(result) > 0 assert int(result) > 0
result = node2.query("SELECT count() FROM test{}".format(i)) result = node2.query("SELECT count() FROM test{}".format(i))
assert int(result) == 500000 * (i + 1) assert int(result) == 5000000 * (i + 1)
result = node2.query( result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i) "SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)

View File

@ -3,6 +3,12 @@ import time
import os import os
import pytest import pytest
# FIXME Test is temporarily disabled due to flakyness
# https://github.com/ClickHouse/ClickHouse/issues/39700
pytestmark = pytest.mark.skip
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread from helpers.utility import generate_values, replace_config, SafeThread

View File

@ -110,6 +110,10 @@ def started_cluster():
main_configs=["configs/defaultS3.xml"], main_configs=["configs/defaultS3.xml"],
user_configs=["configs/s3_max_redirects.xml"], user_configs=["configs/s3_max_redirects.xml"],
) )
cluster.add_instance(
"s3_non_default",
with_minio=True,
)
logging.info("Starting cluster...") logging.info("Starting cluster...")
cluster.start() cluster.start()
logging.info("Cluster started") logging.info("Cluster started")
@ -1689,3 +1693,22 @@ def test_schema_inference_cache(started_cluster):
test("s3") test("s3")
test("url") test("url")
def test_ast_auth_headers(started_cluster):
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["s3_non_default"] # type: ClickHouseInstance
filename = "test.csv"
result = instance.query_and_get_error(
f"select count() from s3('http://resolver:8080/{bucket}/{filename}', 'CSV')"
)
assert "Forbidden Error" in result
assert "S3_ERROR" in result
result = instance.query(
f"select * from s3('http://resolver:8080/{bucket}/{filename}', 'CSV', headers(Authorization=`Bearer TOKEN`))"
)
assert result.strip() == "1\t2\t3"

View File

@ -0,0 +1,22 @@
<clickhouse>
<logger>
<level>trace</level>
<console>true</console>
</logger>
<tcp_port>9000</tcp_port>
<allow_implicit_no_password>0</allow_implicit_no_password>
<path>.</path>
<mark_cache_size>0</mark_cache_size>
<!-- Sources to read users, roles, access rights, profiles of settings, quotas. -->
<user_directories>
<users_xml>
<!-- Path to configuration file with predefined users. -->
<path>users.xml</path>
</users_xml>
<local_directory>
<!-- Path to folder where users created by SQL commands are stored. -->
<path>./</path>
</local_directory>
</user_directories>
</clickhouse>

View File

@ -0,0 +1,85 @@
#!/usr/bin/env bash
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-parallel, no-fasttest
# Tag no-tsan: requires jemalloc to track small allocations
# Tag no-asan: requires jemalloc to track small allocations
# Tag no-ubsan: requires jemalloc to track small allocations
# Tag no-msan: requires jemalloc to track small allocations
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
cp /etc/clickhouse-server/users.xml "$CURDIR"/users.xml
sed -i 's/<password><\/password>/<password_sha256_hex>c64c5e4e53ea1a9f1427d2713b3a22bbebe8940bc807adaf654744b1568c70ab<\/password_sha256_hex>/g' "$CURDIR"/users.xml
sed -i 's/<!-- <access_management>1<\/access_management> -->/<access_management>1<\/access_management>/g' "$CURDIR"/users.xml
server_opts=(
"--config-file=$CURDIR/$(basename "${BASH_SOURCE[0]}" .sh).config.xml"
"--"
# to avoid multiple listen sockets (complexity for port discovering)
"--listen_host=127.1"
# we will discover the real port later.
"--tcp_port=0"
"--shutdown_wait_unfinished=0"
)
CLICKHOUSE_WATCHDOG_ENABLE=0 $CLICKHOUSE_SERVER_BINARY "${server_opts[@]}" &> clickhouse-server.stderr &
server_pid=$!
server_port=
i=0 retries=300
# wait until server will start to listen (max 30 seconds)
while [[ -z $server_port ]] && [[ $i -lt $retries ]]; do
server_port=$(lsof -n -a -P -i tcp -s tcp:LISTEN -p $server_pid 2>/dev/null | awk -F'[ :]' '/LISTEN/ { print $(NF-1) }')
((++i))
sleep 0.1
if ! kill -0 $server_pid >& /dev/null; then
echo "No server (pid $server_pid)"
break
fi
done
if [[ -z $server_port ]]; then
echo "Cannot wait for LISTEN socket" >&2
exit 1
fi
# wait for the server to start accepting tcp connections (max 30 seconds)
i=0 retries=300
while ! $CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" --format Null -q 'select 1' 2>/dev/null && [[ $i -lt $retries ]]; do
sleep 0.1
if ! kill -0 $server_pid >& /dev/null; then
echo "No server (pid $server_pid)"
break
fi
done
if ! $CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" --format Null -q 'select 1'; then
echo "Cannot wait until server will start accepting connections on <tcp_port>" >&2
exit 1
fi
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "DROP USER IF EXISTS u1_02422, u2_02422, u3_02422";
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "CREATE USER u1_02422" " -- { serverError 516 } --" &> /dev/null ;
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "CREATE USER u2_02422 IDENTIFIED WITH no_password "
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "CREATE USER u3_02422 IDENTIFIED BY 'qwe123'";
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "DROP USER u2_02422, u3_02422";
# no sleep, since flushing to stderr should not be buffered.
grep 'User is not allowed to Create users' clickhouse-server.stderr
# send TERM and save the error code to ensure that it is 0 (EXIT_SUCCESS)
kill $server_pid
wait $server_pid
return_code=$?
rm -f clickhouse-server.stderr
rm -f "$CURDIR"/users.xml
exit $return_code

View File

@ -0,0 +1,3 @@
insert into function file(02453_data.jsonl, TSV) select 1 settings engine_file_truncate_on_insert=1;
select * from file(02453_data.jsonl, auto, 'x UInt32') settings input_format_allow_errors_num=1, input_format_record_errors_file_path='../error_file'; -- {serverError DATABASE_ACCESS_DENIED}