mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge branch 'master' into fix_part_removal_retries
This commit is contained in:
commit
5852df7f65
@ -163,7 +163,7 @@
|
||||
* Fix `base58Encode / base58Decode` handling leading 0 / '1'. [#40620](https://github.com/ClickHouse/ClickHouse/pull/40620) ([Andrey Zvonov](https://github.com/zvonand)).
|
||||
* keeper-fix: fix race in accessing logs while snapshot is being installed. [#40627](https://github.com/ClickHouse/ClickHouse/pull/40627) ([Antonio Andelic](https://github.com/antonio2368)).
|
||||
* Fix short circuit execution of toFixedString function. Solves (partially) [#40622](https://github.com/ClickHouse/ClickHouse/issues/40622). [#40628](https://github.com/ClickHouse/ClickHouse/pull/40628) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* - Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)).
|
||||
* Fixes SQLite int8 column conversion to int64 column in ClickHouse. Fixes [#40639](https://github.com/ClickHouse/ClickHouse/issues/40639). [#40642](https://github.com/ClickHouse/ClickHouse/pull/40642) ([Barum Rho](https://github.com/barumrho)).
|
||||
* Fix stack overflow in recursive `Buffer` tables. This closes [#40637](https://github.com/ClickHouse/ClickHouse/issues/40637). [#40643](https://github.com/ClickHouse/ClickHouse/pull/40643) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
|
||||
* During insertion of a new query to the `ProcessList` allocations happen. If we reach the memory limit during these allocations we can not use `OvercommitTracker`, because `ProcessList::mutex` is already acquired. Fixes [#40611](https://github.com/ClickHouse/ClickHouse/issues/40611). [#40677](https://github.com/ClickHouse/ClickHouse/pull/40677) ([Dmitry Novik](https://github.com/novikd)).
|
||||
* Fix LOGICAL_ERROR with max_read_buffer_size=0 during reading marks. [#40705](https://github.com/ClickHouse/ClickHouse/pull/40705) ([Azat Khuzhin](https://github.com/azat)).
|
||||
@ -174,7 +174,7 @@
|
||||
* In [#40595](https://github.com/ClickHouse/ClickHouse/issues/40595) it was reported that the `host_regexp` functionality was not working properly with a name to address resolution in `/etc/hosts`. It's fixed. [#40769](https://github.com/ClickHouse/ClickHouse/pull/40769) ([Arthur Passos](https://github.com/arthurpassos)).
|
||||
* Fix incremental backups for Log family. [#40827](https://github.com/ClickHouse/ClickHouse/pull/40827) ([Vitaly Baranov](https://github.com/vitlibar)).
|
||||
* Fix extremely rare bug which can lead to potential data loss in zero-copy replication. [#40844](https://github.com/ClickHouse/ClickHouse/pull/40844) ([alesapin](https://github.com/alesapin)).
|
||||
* - Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)).
|
||||
* Fix key condition analyzing crashes when same set expression built from different column(s). [#40850](https://github.com/ClickHouse/ClickHouse/pull/40850) ([Duc Canh Le](https://github.com/canhld94)).
|
||||
* Fix nested JSON Objects schema inference. [#40851](https://github.com/ClickHouse/ClickHouse/pull/40851) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Fix 3-digit prefix directory for filesystem cache files not being deleted if empty. Closes [#40797](https://github.com/ClickHouse/ClickHouse/issues/40797). [#40867](https://github.com/ClickHouse/ClickHouse/pull/40867) ([Kseniia Sumarokova](https://github.com/kssenii)).
|
||||
* Fix uncaught DNS_ERROR on failed connection to replicas. [#40881](https://github.com/ClickHouse/ClickHouse/pull/40881) ([Robert Coelho](https://github.com/coelho)).
|
||||
|
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit 9fec8e11dbb6a352e1cfba8cc9e23ebd7fb77310
|
||||
Subproject commit 76746b35d0e254eaaba71dc3b79e46cba8cbb144
|
@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
|
||||
# lts / testing / prestable / etc
|
||||
ARG REPO_CHANNEL="stable"
|
||||
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
|
||||
ARG VERSION="22.8.5.29"
|
||||
ARG VERSION="22.9.2.7"
|
||||
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
|
||||
|
||||
# user/group precreated explicitly with fixed uid/gid on purpose.
|
||||
|
@ -21,7 +21,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
|
||||
|
||||
ARG REPO_CHANNEL="stable"
|
||||
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
|
||||
ARG VERSION="22.8.5.29"
|
||||
ARG VERSION="22.9.2.7"
|
||||
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
|
||||
|
||||
# set non-empty deb_location_url url to create a docker image
|
||||
|
@ -106,8 +106,8 @@ fi
|
||||
|
||||
if [ -n "$(ls /docker-entrypoint-initdb.d/)" ] || [ -n "$CLICKHOUSE_DB" ]; then
|
||||
# port is needed to check if clickhouse-server is ready for connections
|
||||
HTTP_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=http_port)"
|
||||
HTTPS_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=https_port)"
|
||||
HTTP_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=http_port --try)"
|
||||
HTTPS_PORT="$(clickhouse extract-from-config --config-file "$CLICKHOUSE_CONFIG" --key=https_port --try)"
|
||||
|
||||
if [ -n "$HTTP_PORT" ]; then
|
||||
URL="http://127.0.0.1:$HTTP_PORT/ping"
|
||||
|
@ -243,7 +243,7 @@ export USE_S3_STORAGE_FOR_MERGE_TREE=1
|
||||
configure
|
||||
|
||||
# But we still need default disk because some tables loaded only into it
|
||||
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<disk>s3</disk>|<disk>s3</disk><disk>default</disk>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
|
||||
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
|
||||
mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
|
20
docs/changelogs/v22.9.2.7-stable.md
Normal file
20
docs/changelogs/v22.9.2.7-stable.md
Normal file
@ -0,0 +1,20 @@
|
||||
---
|
||||
sidebar_position: 1
|
||||
sidebar_label: 2022
|
||||
---
|
||||
|
||||
# 2022 Changelog
|
||||
|
||||
### ClickHouse release v22.9.2.7-stable (362e2cefcef) FIXME as compared to v22.9.1.2603-stable (3030d4c7ff0)
|
||||
|
||||
#### Improvement
|
||||
* Backported in [#41709](https://github.com/ClickHouse/ClickHouse/issues/41709): Check file path for path traversal attacks in errors logger for input formats. [#41694](https://github.com/ClickHouse/ClickHouse/pull/41694) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
|
||||
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
|
||||
|
||||
* Backported in [#41696](https://github.com/ClickHouse/ClickHouse/issues/41696): Fixes issue when docker run will fail if "https_port" is not present in config. [#41693](https://github.com/ClickHouse/ClickHouse/pull/41693) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
|
||||
|
||||
#### NOT FOR CHANGELOG / INSIGNIFICANT
|
||||
|
||||
* Fix typos in JSON formats after [#40910](https://github.com/ClickHouse/ClickHouse/issues/40910) [#41614](https://github.com/ClickHouse/ClickHouse/pull/41614) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
|
@ -131,7 +131,7 @@ Example of configuration for versions later or equal to 22.8:
|
||||
<type>cache</type>
|
||||
<disk>s3</disk>
|
||||
<path>/s3_cache/</path>
|
||||
<max_size>10000000</max_size>
|
||||
<max_size>10Gi</max_size>
|
||||
</cache>
|
||||
</disks>
|
||||
<policies>
|
||||
@ -155,7 +155,7 @@ Example of configuration for versions earlier than 22.8:
|
||||
<endpoint>...</endpoint>
|
||||
... s3 configuration ...
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
<data_cache_max_size>10000000</data_cache_max_size>
|
||||
<data_cache_max_size>10737418240</data_cache_max_size>
|
||||
</s3>
|
||||
</disks>
|
||||
<policies>
|
||||
@ -172,7 +172,7 @@ Cache **configuration settings**:
|
||||
|
||||
- `path` - path to the directory with cache. Default: None, this setting is obligatory.
|
||||
|
||||
- `max_size` - maximum size of the cache in bytes. When the limit is reached, cache files are evicted according to the cache eviction policy. Default: None, this setting is obligatory.
|
||||
- `max_size` - maximum size of the cache in bytes or in readable format, e.g. `ki, Mi, Gi, etc`, example `10Gi` (such format works starting from `22.10` version). When the limit is reached, cache files are evicted according to the cache eviction policy. Default: None, this setting is obligatory.
|
||||
|
||||
- `cache_on_write_operations` - allow to turn on `write-through` cache (caching data on any write operations: `INSERT` queries, background merges). Default: `false`. The `write-through` cache can be disabled per query using setting `enable_filesystem_cache_on_write_operations` (data is cached only if both cache config settings and corresponding query setting are enabled).
|
||||
|
||||
@ -182,7 +182,7 @@ Cache **configuration settings**:
|
||||
|
||||
- `do_not_evict_index_and_mark_files` - do not evict small frequently used files according to cache policy. Default: `false`. This setting was added in version 22.8. If you used filesystem cache before this version, then it will not work on versions starting from 22.8 if this setting is set to `true`. If you want to use this setting, clear old cache created before version 22.8 before upgrading.
|
||||
|
||||
- `max_file_segment_size` - a maximum size of a single cache file. Default: `104857600` (100 Mb).
|
||||
- `max_file_segment_size` - a maximum size of a single cache file in bytes or in readable format (`ki, Mi, Gi, etc`, example `10Gi`). Default: `104857600` (`100Mi`).
|
||||
|
||||
- `max_elements` - a limit for a number of cache files. Default: `1048576`.
|
||||
|
||||
|
@ -462,8 +462,9 @@
|
||||
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
|
||||
|
||||
<!-- Disable AuthType plaintext_password and no_password for ACL. -->
|
||||
<!-- <allow_plaintext_password>0</allow_plaintext_password> -->
|
||||
<!-- <allow_no_password>0</allow_no_password> -->`
|
||||
<allow_plaintext_password>1</allow_plaintext_password>
|
||||
<allow_no_password>1</allow_no_password>
|
||||
<allow_implicit_no_password>1</allow_implicit_no_password>
|
||||
|
||||
<!-- Policy from the <storage_configuration> for the temporary files.
|
||||
If not set <tmp_path> is used, otherwise <tmp_path> is ignored.
|
||||
|
@ -162,6 +162,7 @@ void AccessControl::setUpFromMainConfig(const Poco::Util::AbstractConfiguration
|
||||
if (config_.has("custom_settings_prefixes"))
|
||||
setCustomSettingsPrefixes(config_.getString("custom_settings_prefixes"));
|
||||
|
||||
setImplicitNoPasswordAllowed(config_.getBool("allow_implicit_no_password", true));
|
||||
setNoPasswordAllowed(config_.getBool("allow_no_password", true));
|
||||
setPlaintextPasswordAllowed(config_.getBool("allow_plaintext_password", true));
|
||||
|
||||
@ -499,6 +500,15 @@ void AccessControl::checkSettingNameIsAllowed(const std::string_view setting_nam
|
||||
custom_settings_prefixes->checkSettingNameIsAllowed(setting_name);
|
||||
}
|
||||
|
||||
void AccessControl::setImplicitNoPasswordAllowed(bool allow_implicit_no_password_)
|
||||
{
|
||||
allow_implicit_no_password = allow_implicit_no_password_;
|
||||
}
|
||||
|
||||
bool AccessControl::isImplicitNoPasswordAllowed() const
|
||||
{
|
||||
return allow_implicit_no_password;
|
||||
}
|
||||
|
||||
void AccessControl::setNoPasswordAllowed(bool allow_no_password_)
|
||||
{
|
||||
|
@ -134,6 +134,11 @@ public:
|
||||
bool isSettingNameAllowed(const std::string_view name) const;
|
||||
void checkSettingNameIsAllowed(const std::string_view name) const;
|
||||
|
||||
/// Allows implicit user creation without password (by default it's allowed).
|
||||
/// In other words, allow 'CREATE USER' queries without 'IDENTIFIED WITH' clause.
|
||||
void setImplicitNoPasswordAllowed(const bool allow_implicit_no_password_);
|
||||
bool isImplicitNoPasswordAllowed() const;
|
||||
|
||||
/// Allows users without password (by default it's allowed).
|
||||
void setNoPasswordAllowed(const bool allow_no_password_);
|
||||
bool isNoPasswordAllowed() const;
|
||||
@ -222,6 +227,7 @@ private:
|
||||
std::unique_ptr<AccessChangesNotifier> changes_notifier;
|
||||
std::atomic_bool allow_plaintext_password = true;
|
||||
std::atomic_bool allow_no_password = true;
|
||||
std::atomic_bool allow_implicit_no_password = true;
|
||||
std::atomic_bool users_without_row_policies_can_read_rows = false;
|
||||
std::atomic_bool on_cluster_queries_require_cluster_grant = false;
|
||||
std::atomic_bool select_from_system_db_requires_grant = false;
|
||||
|
@ -7,11 +7,15 @@
|
||||
#include <base/unaligned.h>
|
||||
#include <Core/Field.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/TargetSpecific.h>
|
||||
#include <Core/TypeId.h>
|
||||
#include <base/TypeName.h>
|
||||
|
||||
#include "config_core.h"
|
||||
|
||||
#if USE_MULTITARGET_CODE
|
||||
# include <immintrin.h>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -391,6 +395,124 @@ protected:
|
||||
Container data;
|
||||
};
|
||||
|
||||
DECLARE_DEFAULT_CODE(
|
||||
template <typename Container, typename Type>
|
||||
inline void vectorIndexImpl(const Container & data, const PaddedPODArray<Type> & indexes, size_t limit, Container & res_data)
|
||||
{
|
||||
for (size_t i = 0; i < limit; ++i)
|
||||
res_data[i] = data[indexes[i]];
|
||||
}
|
||||
);
|
||||
|
||||
DECLARE_AVX512VBMI_SPECIFIC_CODE(
|
||||
template <typename Container, typename Type>
|
||||
inline void vectorIndexImpl(const Container & data, const PaddedPODArray<Type> & indexes, size_t limit, Container & res_data)
|
||||
{
|
||||
static constexpr UInt64 MASK64 = 0xffffffffffffffff;
|
||||
const size_t limit64 = limit & ~63;
|
||||
size_t pos = 0;
|
||||
size_t data_size = data.size();
|
||||
|
||||
auto data_pos = reinterpret_cast<const UInt8 *>(data.data());
|
||||
auto indexes_pos = reinterpret_cast<const UInt8 *>(indexes.data());
|
||||
auto res_pos = reinterpret_cast<UInt8 *>(res_data.data());
|
||||
|
||||
if (data_size <= 64)
|
||||
{
|
||||
/// one single mask load for table size <= 64
|
||||
__mmask64 last_mask = MASK64 >> (64 - data_size);
|
||||
__m512i table1 = _mm512_maskz_loadu_epi8(last_mask, data_pos);
|
||||
|
||||
/// 64 bytes table lookup using one single permutexvar_epi8
|
||||
while (pos < limit64)
|
||||
{
|
||||
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
|
||||
__m512i out = _mm512_permutexvar_epi8(vidx, table1);
|
||||
_mm512_storeu_epi8(res_pos + pos, out);
|
||||
pos += 64;
|
||||
}
|
||||
/// tail handling
|
||||
if (limit > limit64)
|
||||
{
|
||||
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
|
||||
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
|
||||
__m512i out = _mm512_permutexvar_epi8(vidx, table1);
|
||||
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
|
||||
}
|
||||
}
|
||||
else if (data_size <= 128)
|
||||
{
|
||||
/// table size (64, 128] requires 2 zmm load
|
||||
__mmask64 last_mask = MASK64 >> (128 - data_size);
|
||||
__m512i table1 = _mm512_loadu_epi8(data_pos);
|
||||
__m512i table2 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 64);
|
||||
|
||||
/// 128 bytes table lookup using one single permute2xvar_epi8
|
||||
while (pos < limit64)
|
||||
{
|
||||
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
|
||||
__m512i out = _mm512_permutex2var_epi8(table1, vidx, table2);
|
||||
_mm512_storeu_epi8(res_pos + pos, out);
|
||||
pos += 64;
|
||||
}
|
||||
if (limit > limit64)
|
||||
{
|
||||
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
|
||||
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
|
||||
__m512i out = _mm512_permutex2var_epi8(table1, vidx, table2);
|
||||
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (data_size > 256)
|
||||
{
|
||||
/// byte index will not exceed 256 boundary.
|
||||
data_size = 256;
|
||||
}
|
||||
|
||||
__m512i table1 = _mm512_loadu_epi8(data_pos);
|
||||
__m512i table2 = _mm512_loadu_epi8(data_pos + 64);
|
||||
__m512i table3, table4;
|
||||
if (data_size <= 192)
|
||||
{
|
||||
/// only 3 tables need to load if size <= 192
|
||||
__mmask64 last_mask = MASK64 >> (192 - data_size);
|
||||
table3 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 128);
|
||||
table4 = _mm512_setzero_si512();
|
||||
}
|
||||
else
|
||||
{
|
||||
__mmask64 last_mask = MASK64 >> (256 - data_size);
|
||||
table3 = _mm512_loadu_epi8(data_pos + 128);
|
||||
table4 = _mm512_maskz_loadu_epi8(last_mask, data_pos + 192);
|
||||
}
|
||||
|
||||
/// 256 bytes table lookup can use: 2 permute2xvar_epi8 plus 1 blender with MSB
|
||||
while (pos < limit64)
|
||||
{
|
||||
__m512i vidx = _mm512_loadu_epi8(indexes_pos + pos);
|
||||
__m512i tmp1 = _mm512_permutex2var_epi8(table1, vidx, table2);
|
||||
__m512i tmp2 = _mm512_permutex2var_epi8(table3, vidx, table4);
|
||||
__mmask64 msb = _mm512_movepi8_mask(vidx);
|
||||
__m512i out = _mm512_mask_blend_epi8(msb, tmp1, tmp2);
|
||||
_mm512_storeu_epi8(res_pos + pos, out);
|
||||
pos += 64;
|
||||
}
|
||||
if (limit > limit64)
|
||||
{
|
||||
__mmask64 tail_mask = MASK64 >> (limit64 + 64 - limit);
|
||||
__m512i vidx = _mm512_maskz_loadu_epi8(tail_mask, indexes_pos + pos);
|
||||
__m512i tmp1 = _mm512_permutex2var_epi8(table1, vidx, table2);
|
||||
__m512i tmp2 = _mm512_permutex2var_epi8(table3, vidx, table4);
|
||||
__mmask64 msb = _mm512_movepi8_mask(vidx);
|
||||
__m512i out = _mm512_mask_blend_epi8(msb, tmp1, tmp2);
|
||||
_mm512_mask_storeu_epi8(res_pos + pos, tail_mask, out);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
template <typename T>
|
||||
template <typename Type>
|
||||
ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
|
||||
@ -399,8 +521,18 @@ ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_
|
||||
|
||||
auto res = this->create(limit);
|
||||
typename Self::Container & res_data = res->getData();
|
||||
for (size_t i = 0; i < limit; ++i)
|
||||
res_data[i] = data[indexes[i]];
|
||||
#if USE_MULTITARGET_CODE
|
||||
if constexpr (sizeof(T) == 1 && sizeof(Type) == 1)
|
||||
{
|
||||
/// VBMI optimization only applicable for (U)Int8 types
|
||||
if (isArchSupported(TargetArch::AVX512VBMI))
|
||||
{
|
||||
TargetSpecific::AVX512VBMI::vectorIndexImpl<Container, Type>(data, indexes, limit, res_data);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
TargetSpecific::Default::vectorIndexImpl<Container, Type>(data, indexes, limit, res_data);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -1,10 +1,10 @@
|
||||
#include <limits>
|
||||
#include <typeinfo>
|
||||
#include <vector>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
||||
using namespace DB;
|
||||
|
||||
static pcg64 rng(randomSeed());
|
||||
@ -76,7 +76,6 @@ static void testFilter()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TEST(ColumnVector, Filter)
|
||||
{
|
||||
testFilter<UInt8>();
|
||||
@ -89,3 +88,71 @@ TEST(ColumnVector, Filter)
|
||||
testFilter<Float64>();
|
||||
testFilter<UUID>();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static MutableColumnPtr createIndexColumn(size_t limit, size_t rows)
|
||||
{
|
||||
auto column = ColumnVector<T>::create();
|
||||
auto & values = column->getData();
|
||||
auto max = std::numeric_limits<T>::max();
|
||||
limit = limit > max ? max : limit;
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
T val = rng() % limit;
|
||||
values.push_back(val);
|
||||
}
|
||||
|
||||
return column;
|
||||
}
|
||||
|
||||
template <typename T, typename IndexType>
|
||||
static void testIndex()
|
||||
{
|
||||
static const std::vector<size_t> column_sizes = {64, 128, 196, 256, 512};
|
||||
|
||||
auto test_case = [&](size_t rows, size_t index_rows, size_t limit)
|
||||
{
|
||||
auto vector_column = createColumn<T>(rows);
|
||||
auto index_column = createIndexColumn<IndexType>(rows, index_rows);
|
||||
auto res_column = vector_column->index(*index_column, limit);
|
||||
if (limit == 0)
|
||||
limit = index_column->size();
|
||||
|
||||
/// check results
|
||||
if (limit != res_column->size())
|
||||
throw Exception(error_code, "ColumnVector index size not match to limit: {} {}", typeid(T).name(), typeid(IndexType).name());
|
||||
for (size_t i = 0; i < limit; ++i)
|
||||
{
|
||||
/// vector_column data is the same as index, so indexed column's value will equals to index_column.
|
||||
if (res_column->get64(i) != index_column->get64(i))
|
||||
throw Exception(error_code, "ColumnVector index fail: {} {}", typeid(T).name(), typeid(IndexType).name());
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
for (size_t i = 0; i < TEST_RUNS; ++i)
|
||||
{
|
||||
/// make sure rows distribute in (column_sizes[r-1], colulmn_sizes[r]]
|
||||
size_t row_idx = rng() % column_sizes.size();
|
||||
size_t row_base = row_idx > 0 ? column_sizes[row_idx - 1] : 0;
|
||||
size_t rows = row_base + (rng() % (column_sizes[row_idx] - row_base) + 1);
|
||||
size_t index_rows = rng() % MAX_ROWS + 1;
|
||||
|
||||
test_case(rows, index_rows, 0);
|
||||
test_case(rows, index_rows, static_cast<size_t>(0.5 * index_rows));
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
FAIL() << e.displayText();
|
||||
}
|
||||
}
|
||||
|
||||
TEST(ColumnVector, Index)
|
||||
{
|
||||
testIndex<UInt8, UInt8>();
|
||||
testIndex<UInt16, UInt8>();
|
||||
testIndex<UInt16, UInt16>();
|
||||
}
|
||||
|
48
src/Common/EventNotifier.cpp
Normal file
48
src/Common/EventNotifier.cpp
Normal file
@ -0,0 +1,48 @@
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
std::unique_ptr<EventNotifier> EventNotifier::event_notifier;
|
||||
|
||||
EventNotifier & EventNotifier::init()
|
||||
{
|
||||
if (event_notifier)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is initialized twice. This is a bug.");
|
||||
|
||||
event_notifier = std::make_unique<EventNotifier>();
|
||||
|
||||
return *event_notifier;
|
||||
}
|
||||
|
||||
EventNotifier & EventNotifier::instance()
|
||||
{
|
||||
if (!event_notifier)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "EventNotifier is not initialized. This is a bug.");
|
||||
|
||||
return *event_notifier;
|
||||
}
|
||||
|
||||
void EventNotifier::shutdown()
|
||||
{
|
||||
if (event_notifier)
|
||||
event_notifier.reset();
|
||||
}
|
||||
|
||||
size_t EventNotifier::calculateIdentifier(size_t a, size_t b)
|
||||
{
|
||||
size_t result = 0;
|
||||
boost::hash_combine(result, a);
|
||||
boost::hash_combine(result, b);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
92
src/Common/EventNotifier.h
Normal file
92
src/Common/EventNotifier.h
Normal file
@ -0,0 +1,92 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
#include <functional>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <iostream>
|
||||
|
||||
#include <base/types.h>
|
||||
#include <Common/HashTable/Hash.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class EventNotifier
|
||||
{
|
||||
public:
|
||||
struct Handler
|
||||
{
|
||||
Handler(
|
||||
EventNotifier & parent_,
|
||||
size_t event_id_,
|
||||
size_t callback_id_)
|
||||
: parent(parent_)
|
||||
, event_id(event_id_)
|
||||
, callback_id(callback_id_)
|
||||
{}
|
||||
|
||||
~Handler()
|
||||
{
|
||||
std::lock_guard lock(parent.mutex);
|
||||
|
||||
parent.callback_table[event_id].erase(callback_id);
|
||||
parent.storage.erase(callback_id);
|
||||
}
|
||||
|
||||
private:
|
||||
EventNotifier & parent;
|
||||
size_t event_id;
|
||||
size_t callback_id;
|
||||
};
|
||||
|
||||
using HandlerPtr = std::shared_ptr<Handler>;
|
||||
|
||||
static EventNotifier & init();
|
||||
static EventNotifier & instance();
|
||||
static void shutdown();
|
||||
|
||||
template <typename EventType, typename Callback>
|
||||
[[ nodiscard ]] HandlerPtr subscribe(EventType event, Callback && callback)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto event_id = DefaultHash64(event);
|
||||
auto callback_id = calculateIdentifier(event_id, ++counter);
|
||||
|
||||
callback_table[event_id].insert(callback_id);
|
||||
storage[callback_id] = std::forward<Callback>(callback);
|
||||
|
||||
return std::make_shared<Handler>(*this, event_id, callback_id);
|
||||
}
|
||||
|
||||
template <typename EventType>
|
||||
void notify(EventType event)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
for (const auto & identifier : callback_table[DefaultHash64(event)])
|
||||
storage[identifier]();
|
||||
}
|
||||
|
||||
private:
|
||||
// To move boost include for .h file
|
||||
static size_t calculateIdentifier(size_t a, size_t b);
|
||||
|
||||
using CallbackType = std::function<void()>;
|
||||
using CallbackStorage = std::map<size_t, CallbackType>;
|
||||
using EventToCallbacks = std::map<size_t, std::set<size_t>>;
|
||||
|
||||
std::mutex mutex;
|
||||
|
||||
EventToCallbacks callback_table;
|
||||
CallbackStorage storage;
|
||||
size_t counter{0};
|
||||
|
||||
static std::unique_ptr<EventNotifier> event_notifier;
|
||||
};
|
||||
|
||||
}
|
@ -139,12 +139,14 @@ void ZooKeeper::init(ZooKeeperArgs args_)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
: zk_log(std::move(zk_log_))
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
init(args_);
|
||||
}
|
||||
|
||||
|
||||
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
: zk_log(std::move(zk_log_))
|
||||
{
|
||||
|
@ -1,15 +1,16 @@
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperImpl.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <base/getThreadId.h>
|
||||
|
||||
#include <Common/config.h>
|
||||
@ -874,7 +875,11 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
|
||||
/// No new requests will appear in queue after finish()
|
||||
bool was_already_finished = requests_queue.finish();
|
||||
if (!was_already_finished)
|
||||
{
|
||||
active_session_metric_increment.destroy();
|
||||
/// Notify all subscribers (ReplicatedMergeTree tables) about expired session
|
||||
EventNotifier::instance().notify(Error::ZSESSIONEXPIRED);
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
|
53
src/Common/tests/gtest_event_notifier.cpp
Normal file
53
src/Common/tests/gtest_event_notifier.cpp
Normal file
@ -0,0 +1,53 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
|
||||
|
||||
TEST(EventNotifier, SimpleTest)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
size_t result = 1;
|
||||
EventNotifier::init();
|
||||
|
||||
auto handler3 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 3; });
|
||||
|
||||
{
|
||||
auto handler5 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 5; });
|
||||
}
|
||||
|
||||
auto handler7 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 7; });
|
||||
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 21);
|
||||
|
||||
result = 1;
|
||||
handler3.reset();
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 7);
|
||||
|
||||
auto handler11 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 11; });
|
||||
|
||||
result = 1;
|
||||
handler7.reset();
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 11);
|
||||
|
||||
EventNotifier::HandlerPtr handler13;
|
||||
{
|
||||
handler13 = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [&result](){ result *= 13; });
|
||||
}
|
||||
|
||||
result = 1;
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 143);
|
||||
|
||||
result = 1;
|
||||
handler11.reset();
|
||||
handler13.reset();
|
||||
EventNotifier::instance().notify(Coordination::Error::ZSESSIONEXPIRED);
|
||||
ASSERT_EQ(result, 1);
|
||||
|
||||
EventNotifier::shutdown();
|
||||
}
|
@ -245,8 +245,8 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
|
||||
credentials.setPassword(named_collection->configuration.password);
|
||||
|
||||
header_entries.reserve(named_collection->configuration.headers.size());
|
||||
for (const auto & header : named_collection->configuration.headers)
|
||||
header_entries.emplace_back(std::make_tuple(header.first, header.second.get<String>()));
|
||||
for (const auto & [key, value] : named_collection->configuration.headers)
|
||||
header_entries.emplace_back(std::make_tuple(key, value));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
||||
@ -53,7 +54,31 @@ const std::string & MetadataStorageFromStaticFilesWebServer::getPath() const
|
||||
|
||||
bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) const
|
||||
{
|
||||
return object_storage.files.contains(path);
|
||||
fs::path fs_path(path);
|
||||
if (fs_path.has_extension())
|
||||
fs_path = fs_path.parent_path();
|
||||
|
||||
initializeIfNeeded(fs_path, false);
|
||||
|
||||
if (object_storage.files.empty())
|
||||
return false;
|
||||
|
||||
if (object_storage.files.contains(path))
|
||||
return true;
|
||||
|
||||
/// `object_storage.files` contains files + directories only inside `metadata_path / uuid_3_digit / uuid /`
|
||||
/// (specific table files only), but we need to be able to also tell if `exists(<metadata_path>)`, for example.
|
||||
auto it = std::lower_bound(
|
||||
object_storage.files.begin(),
|
||||
object_storage.files.end(),
|
||||
path,
|
||||
[](const auto & file, const std::string & path_) { return file.first < path_; }
|
||||
);
|
||||
if (startsWith(it->first, path)
|
||||
|| (it != object_storage.files.begin() && startsWith(std::prev(it)->first, path)))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const
|
||||
@ -98,7 +123,10 @@ uint64_t MetadataStorageFromStaticFilesWebServer::getFileSize(const String & pat
|
||||
StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const
|
||||
{
|
||||
assertExists(path);
|
||||
return {StoredObject::create(object_storage, path, object_storage.files.at(path).size, true)};
|
||||
auto fs_path = fs::path(object_storage.url) / path;
|
||||
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||
remote_path = remote_path.substr(object_storage.url.size());
|
||||
return {StoredObject::create(object_storage, remote_path, object_storage.files.at(path).size, true)};
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const
|
||||
@ -112,7 +140,7 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
|
||||
return result;
|
||||
}
|
||||
|
||||
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path) const
|
||||
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error) const
|
||||
{
|
||||
if (object_storage.files.find(path) == object_storage.files.end())
|
||||
{
|
||||
@ -123,7 +151,7 @@ bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::stri
|
||||
catch (...)
|
||||
{
|
||||
const auto message = getCurrentExceptionMessage(false);
|
||||
bool can_throw = CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
|
||||
bool can_throw = throw_on_error.has_value() ? *throw_on_error : CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
|
||||
if (can_throw)
|
||||
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
|
||||
|
||||
@ -140,13 +168,17 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
|
||||
std::vector<fs::path> dir_file_paths;
|
||||
|
||||
if (!initializeIfNeeded(path))
|
||||
{
|
||||
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
|
||||
}
|
||||
|
||||
assertExists(path);
|
||||
|
||||
for (const auto & [file_path, _] : object_storage.files)
|
||||
if (parentPath(file_path) == path)
|
||||
{
|
||||
if (fs::path(parentPath(file_path)) / "" == fs::path(path) / "")
|
||||
dir_file_paths.emplace_back(file_path);
|
||||
}
|
||||
|
||||
LOG_TRACE(object_storage.log, "Iterate directory {} with {} files", path, dir_file_paths.size());
|
||||
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
|
||||
|
@ -19,7 +19,7 @@ private:
|
||||
|
||||
void assertExists(const std::string & path) const;
|
||||
|
||||
bool initializeIfNeeded(const std::string & path) const;
|
||||
bool initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error = std::nullopt) const;
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);
|
||||
|
@ -30,7 +30,6 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int NETWORK_ERROR;
|
||||
}
|
||||
|
||||
@ -153,20 +152,6 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
||||
std::optional<size_t>,
|
||||
std::optional<size_t>) const
|
||||
{
|
||||
const auto & path = object.absolute_path;
|
||||
LOG_TRACE(log, "Read from path: {}", path);
|
||||
|
||||
auto iter = files.find(path);
|
||||
if (iter == files.end())
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File path {} does not exist", path);
|
||||
|
||||
auto fs_path = fs::path(url) / path;
|
||||
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||
remote_path = remote_path.string().substr(url.size());
|
||||
|
||||
StoredObjects objects;
|
||||
objects.emplace_back(remote_path, iter->second.size);
|
||||
|
||||
auto read_buffer_creator =
|
||||
[this, read_settings]
|
||||
(const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
|
||||
@ -179,7 +164,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
||||
read_until_position);
|
||||
};
|
||||
|
||||
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, read_settings);
|
||||
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
|
||||
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
|
@ -114,7 +114,7 @@ protected:
|
||||
size_t size = 0;
|
||||
};
|
||||
|
||||
using Files = std::unordered_map<String, FileData>; /// file path -> file data
|
||||
using Files = std::map<String, FileData>; /// file path -> file data
|
||||
mutable Files files;
|
||||
|
||||
String url;
|
||||
|
@ -39,6 +39,7 @@ StoragePolicy::StoragePolicy(
|
||||
const String & config_prefix,
|
||||
DiskSelectorPtr disks)
|
||||
: name(std::move(name_))
|
||||
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
String volumes_prefix = config_prefix + ".volumes";
|
||||
@ -81,11 +82,15 @@ StoragePolicy::StoragePolicy(
|
||||
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
buildVolumeIndices();
|
||||
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
|
||||
}
|
||||
|
||||
|
||||
StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_)
|
||||
: volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
|
||||
: volumes(std::move(volumes_))
|
||||
, name(std::move(name_))
|
||||
, move_factor(move_factor_)
|
||||
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
|
||||
{
|
||||
if (volumes.empty())
|
||||
throw Exception("Storage policy " + backQuote(name) + " must contain at least one Volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
||||
@ -94,6 +99,7 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_
|
||||
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
buildVolumeIndices();
|
||||
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
|
||||
}
|
||||
|
||||
|
||||
@ -206,12 +212,16 @@ UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
|
||||
|
||||
ReservationPtr StoragePolicy::reserve(UInt64 bytes, size_t min_volume_index) const
|
||||
{
|
||||
LOG_TRACE(log, "Reserving bytes {} from volume index {}, total volumes {}", bytes, min_volume_index, volumes.size());
|
||||
for (size_t i = min_volume_index; i < volumes.size(); ++i)
|
||||
{
|
||||
const auto & volume = volumes[i];
|
||||
auto reservation = volume->reserve(bytes);
|
||||
if (reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Successfully reserved {} bytes on volume index {}", bytes, i);
|
||||
return reservation;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
@ -104,6 +104,8 @@ private:
|
||||
double move_factor = 0.1; /// by default move factor is 10%
|
||||
|
||||
void buildVolumeIndices();
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
|
||||
|
@ -274,6 +274,8 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ true);
|
||||
}
|
||||
|
||||
/// In case of error this address will be written to logs
|
||||
request.SetResolvedRemoteHost(session->getResolvedAddress());
|
||||
|
||||
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);
|
||||
|
||||
|
@ -114,6 +114,7 @@ struct URI
|
||||
bool is_virtual_hosted_style;
|
||||
|
||||
explicit URI(const Poco::URI & uri_);
|
||||
explicit URI(const std::string & uri_) : URI(Poco::URI(uri_)) {}
|
||||
|
||||
static void validateBucket(const String & bucket, const Poco::URI & uri);
|
||||
};
|
||||
|
@ -100,9 +100,14 @@ BlockIO InterpreterCreateUserQuery::execute()
|
||||
auto & access_control = getContext()->getAccessControl();
|
||||
auto access = getContext()->getAccess();
|
||||
access->checkAccess(query.alter ? AccessType::ALTER_USER : AccessType::CREATE_USER);
|
||||
bool implicit_no_password_allowed = access_control.isImplicitNoPasswordAllowed();
|
||||
bool no_password_allowed = access_control.isNoPasswordAllowed();
|
||||
bool plaintext_password_allowed = access_control.isPlaintextPasswordAllowed();
|
||||
|
||||
if (!query.attach && !query.alter && !query.auth_data && !implicit_no_password_allowed)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Authentication type NO_PASSWORD must be explicitly specified, check the setting allow_implicit_no_password in the server configuration");
|
||||
|
||||
std::optional<RolesOrUsersSet> default_roles_from_query;
|
||||
if (query.default_roles)
|
||||
{
|
||||
|
@ -181,19 +181,19 @@ public:
|
||||
/// because each projection can provide some columns as inputs to substitute certain sub-DAGs
|
||||
/// (expressions). Consider the following example:
|
||||
/// CREATE TABLE tbl (dt DateTime, val UInt64,
|
||||
/// PROJECTION p_hour (SELECT SUM(val) GROUP BY toStartOfHour(dt)));
|
||||
/// PROJECTION p_hour (SELECT sum(val) GROUP BY toStartOfHour(dt)));
|
||||
///
|
||||
/// Query: SELECT toStartOfHour(dt), SUM(val) FROM tbl GROUP BY toStartOfHour(dt);
|
||||
/// Query: SELECT toStartOfHour(dt), sum(val) FROM tbl GROUP BY toStartOfHour(dt);
|
||||
///
|
||||
/// We will have an ActionsDAG like this:
|
||||
/// FUNCTION: toStartOfHour(dt) SUM(val)
|
||||
/// FUNCTION: toStartOfHour(dt) sum(val)
|
||||
/// ^ ^
|
||||
/// | |
|
||||
/// INPUT: dt val
|
||||
///
|
||||
/// Now we traverse the DAG and see if any FUNCTION node can be replaced by projection's INPUT node.
|
||||
/// The result DAG will be:
|
||||
/// INPUT: toStartOfHour(dt) SUM(val)
|
||||
/// INPUT: toStartOfHour(dt) sum(val)
|
||||
///
|
||||
/// We don't need aggregate columns from projection because they are matched after DAG.
|
||||
/// Currently we use canonical names of each node to find matches. It can be improved after we
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -16,7 +17,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
|
||||
if (!config.has(config_prefix + ".max_size"))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected cache size (`max_size`) in configuration");
|
||||
|
||||
max_size = config.getUInt64(config_prefix + ".max_size", 0);
|
||||
max_size = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".max_size"));
|
||||
if (max_size == 0)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected non-zero size for cache configuration");
|
||||
|
||||
@ -25,7 +26,10 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk Cache requires non-empty `path` field (cache base path) in config");
|
||||
|
||||
max_elements = config.getUInt64(config_prefix + ".max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS);
|
||||
max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
|
||||
if (config.has(config_prefix + ".max_file_segment_size"))
|
||||
max_file_segment_size = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".max_file_segment_size"));
|
||||
else
|
||||
max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;
|
||||
|
||||
cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false);
|
||||
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/formatReadable.h>
|
||||
@ -510,6 +511,7 @@ void Context::initGlobal()
|
||||
assert(!global_context_instance);
|
||||
global_context_instance = shared_from_this();
|
||||
DatabaseCatalog::init(shared_from_this());
|
||||
EventNotifier::init();
|
||||
}
|
||||
|
||||
SharedContextHolder Context::createShared()
|
||||
|
@ -5,6 +5,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Rewrite function names to their canonical forms.
|
||||
/// For example, rewrite (1) to (2)
|
||||
/// (1) SELECT suM(1), AVG(2);
|
||||
/// (2) SELECT sum(1), avg(2);
|
||||
///
|
||||
/// It's used to help projection query analysis matching function nodes by their canonical names.
|
||||
/// See the comment of ActionsDAG::foldActionsByProjection for details.
|
||||
struct FunctionNameNormalizer
|
||||
{
|
||||
static void visit(IAST *);
|
||||
|
@ -1,15 +1,20 @@
|
||||
#include <Processors/Formats/InputFormatErrorsLogger.h>
|
||||
#include <Processors/Formats/IRowOutputFormat.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Processors/Formats/IRowOutputFormat.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int DATABASE_ACCESS_DENIED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
const String DEFAULT_OUTPUT_FORMAT = "CSV";
|
||||
@ -26,8 +31,19 @@ InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context)
|
||||
database = context->getInsertionTable().getDatabaseName();
|
||||
|
||||
String path_in_setting = context->getSettingsRef().input_format_record_errors_file_path;
|
||||
errors_file_path = context->getApplicationType() == Context::ApplicationType::SERVER ? context->getUserFilesPath() + path_in_setting
|
||||
: path_in_setting;
|
||||
|
||||
if (context->getApplicationType() == Context::ApplicationType::SERVER)
|
||||
{
|
||||
auto user_files_path = context->getUserFilesPath();
|
||||
errors_file_path = fs::path(user_files_path) / path_in_setting;
|
||||
if (!fileOrSymlinkPathStartsWith(errors_file_path, user_files_path))
|
||||
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Cannot log errors in path `{}`, because it is not inside `{}`", errors_file_path, user_files_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
errors_file_path = path_in_setting;
|
||||
}
|
||||
|
||||
while (fs::exists(errors_file_path))
|
||||
{
|
||||
errors_file_path += "_new";
|
||||
|
@ -307,7 +307,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
|
||||
{
|
||||
const auto header_prefix = headers_prefix + header;
|
||||
configuration.headers.emplace_back(
|
||||
std::make_pair(headers_config->getString(header_prefix + ".name"), headers_config->getString(header_prefix + ".value")));
|
||||
headers_config->getString(header_prefix + ".name"),
|
||||
headers_config->getString(header_prefix + ".value"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -446,7 +447,9 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
|
||||
for (const auto & header : header_keys)
|
||||
{
|
||||
const auto header_prefix = config_prefix + ".headers." + header;
|
||||
configuration.headers.emplace_back(std::make_pair(config.getString(header_prefix + ".name"), config.getString(header_prefix + ".value")));
|
||||
configuration.headers.emplace_back(
|
||||
config.getString(header_prefix + ".name"),
|
||||
config.getString(header_prefix + ".value"));
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Storages/HeaderCollection.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -108,7 +109,7 @@ struct URLBasedDataSourceConfiguration
|
||||
String user;
|
||||
String password;
|
||||
|
||||
std::vector<std::pair<String, Field>> headers;
|
||||
HeaderCollection headers;
|
||||
String http_method;
|
||||
|
||||
void set(const URLBasedDataSourceConfiguration & conf);
|
||||
|
18
src/Storages/HeaderCollection.h
Normal file
18
src/Storages/HeaderCollection.h
Normal file
@ -0,0 +1,18 @@
|
||||
#pragma once
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct HttpHeader
|
||||
{
|
||||
std::string name;
|
||||
std::string value;
|
||||
|
||||
HttpHeader(const std::string & name_, const std::string & value_) : name(name_), value(value_) {}
|
||||
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
|
||||
};
|
||||
|
||||
using HeaderCollection = std::vector<HttpHeader>;
|
||||
|
||||
}
|
@ -100,8 +100,10 @@ struct ReplicatedFetchReadCallback
|
||||
}
|
||||
|
||||
|
||||
Service::Service(StorageReplicatedMergeTree & data_) :
|
||||
data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
|
||||
Service::Service(StorageReplicatedMergeTree & data_)
|
||||
: data(data_)
|
||||
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Replicated PartsService)"))
|
||||
{}
|
||||
|
||||
std::string Service::getId(const std::string & node_id) const
|
||||
{
|
||||
@ -444,6 +446,11 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
||||
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
|
||||
}
|
||||
|
||||
Fetcher::Fetcher(StorageReplicatedMergeTree & data_)
|
||||
: data(data_)
|
||||
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Fetcher)"))
|
||||
{}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
@ -494,6 +501,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
|
||||
if (disk)
|
||||
{
|
||||
LOG_TRACE(log, "Will fetch to disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
UInt64 revision = disk->getRevision();
|
||||
if (revision)
|
||||
uri.addQueryParameter("disk_revision", toString(revision));
|
||||
@ -504,13 +512,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
{
|
||||
if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Trying to fetch with zero-copy replication, but disk is not provided, will try to select");
|
||||
Disks disks = data.getDisks();
|
||||
for (const auto & data_disk : disks)
|
||||
{
|
||||
LOG_TRACE(log, "Checking disk {} with type {}", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
|
||||
if (data_disk->supportZeroCopyReplication())
|
||||
{
|
||||
LOG_TRACE(log, "Disk {} (with type {}) supports zero-copy replication", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
|
||||
capability.push_back(toString(data_disk->getDataSourceDescription().type));
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (disk->supportZeroCopyReplication())
|
||||
{
|
||||
LOG_TRACE(log, "Trying to fetch with zero copy replication, provided disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
capability.push_back(toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
}
|
||||
@ -562,29 +578,47 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
|
||||
assertString("ttl format version: 1\n", ttl_infos_buffer);
|
||||
ttl_infos.read(ttl_infos_buffer);
|
||||
|
||||
if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using storage balanced reservation");
|
||||
reservation
|
||||
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
|
||||
if (!reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using TTL rules");
|
||||
reservation
|
||||
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Making balanced reservation");
|
||||
reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr);
|
||||
if (!reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Making simple reservation");
|
||||
reservation = data.reserveSpace(sum_files_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Making reservation on the largest disk");
|
||||
/// We don't know real size of part because sender server version is too old
|
||||
reservation = data.makeEmptyReservationOnLargestDisk();
|
||||
}
|
||||
|
||||
if (!disk)
|
||||
{
|
||||
disk = reservation->getDisk();
|
||||
LOG_INFO(log, "Disk for fetch is not provided, getting disk from reservation {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO(log, "Disk for fetch is disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
|
||||
UInt64 revision = parse<UInt64>(in->getResponseCookie("disk_revision", "0"));
|
||||
if (revision)
|
||||
@ -989,7 +1023,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
|
||||
|
||||
if (!disk->supportZeroCopyReplication() || !disk->checkUniqueId(part_id))
|
||||
{
|
||||
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName());
|
||||
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
|
||||
|
@ -67,7 +67,7 @@ private:
|
||||
class Fetcher final : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
|
||||
explicit Fetcher(StorageReplicatedMergeTree & data_);
|
||||
|
||||
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
|
||||
MergeTreeData::MutableDataPartPtr fetchSelectedPart(
|
||||
|
@ -4909,12 +4909,14 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
|
||||
{
|
||||
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
|
||||
|
||||
LOG_TRACE(log, "Trying reserve {} bytes preffering TTL rules", expected_size);
|
||||
ReservationPtr reservation;
|
||||
|
||||
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);
|
||||
|
||||
if (move_ttl_entry)
|
||||
{
|
||||
LOG_TRACE(log, "Got move TTL entry, will try to reserver destination for move");
|
||||
SpacePtr destination_ptr = getDestinationForMoveTTL(*move_ttl_entry, is_insert);
|
||||
if (!destination_ptr)
|
||||
{
|
||||
@ -4935,10 +4937,15 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Reserving bytes on selected destination");
|
||||
reservation = destination_ptr->reserve(expected_size);
|
||||
if (reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Reservation successful");
|
||||
return reservation;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (move_ttl_entry->destination_type == DataDestinationType::VOLUME)
|
||||
LOG_WARNING(
|
||||
log,
|
||||
@ -4951,15 +4958,22 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
|
||||
"Would like to reserve space on disk '{}' by TTL rule of table '{}' but there is not enough space",
|
||||
move_ttl_entry->destination_name,
|
||||
*std::atomic_load(&log_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prefer selected_disk
|
||||
if (selected_disk)
|
||||
{
|
||||
LOG_DEBUG(log, "Disk for reservation provided: {} (with type {})", selected_disk->getName(), toString(selected_disk->getDataSourceDescription().type));
|
||||
reservation = selected_disk->reserve(expected_size);
|
||||
}
|
||||
|
||||
if (!reservation)
|
||||
{
|
||||
LOG_DEBUG(log, "No reservation, reserving using storage policy from min volume index {}", min_volume_index);
|
||||
reservation = getStoragePolicy()->reserve(expected_size, min_volume_index);
|
||||
}
|
||||
|
||||
return reservation;
|
||||
}
|
||||
|
@ -1467,6 +1467,8 @@ bool MutateTask::execute()
|
||||
}
|
||||
case State::NEED_EXECUTE:
|
||||
{
|
||||
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
|
||||
|
||||
if (task->executeStep())
|
||||
return true;
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ExpressionListParsers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Interpreters/FunctionNameNormalizer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -24,6 +25,17 @@ static String formattedAST(const ASTPtr & ast)
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
static String formattedASTNormalized(const ASTPtr & ast)
|
||||
{
|
||||
if (!ast)
|
||||
return "";
|
||||
auto ast_normalized = ast->clone();
|
||||
FunctionNameNormalizer().visit(ast_normalized.get());
|
||||
WriteBufferFromOwnString buf;
|
||||
formatAST(*ast_normalized, buf, false, true);
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTreeData & data, const StorageMetadataPtr & metadata_snapshot)
|
||||
{
|
||||
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
@ -33,7 +45,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
}
|
||||
|
||||
const auto data_settings = data.getSettings();
|
||||
sampling_expression = formattedAST(metadata_snapshot->getSamplingKeyAST());
|
||||
sampling_expression = formattedASTNormalized(metadata_snapshot->getSamplingKeyAST());
|
||||
index_granularity = data_settings->index_granularity;
|
||||
merging_params_mode = static_cast<int>(data.merging_params.mode);
|
||||
sign_column = data.merging_params.sign_column;
|
||||
@ -45,7 +57,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
/// - When we have only ORDER BY, than store it in "primary key:" row of /metadata
|
||||
/// - When we have both, than store PRIMARY KEY in "primary key:" row and ORDER BY in "sorting key:" row of /metadata
|
||||
|
||||
primary_key = formattedAST(metadata_snapshot->getPrimaryKey().expression_list_ast);
|
||||
primary_key = formattedASTNormalized(metadata_snapshot->getPrimaryKey().expression_list_ast);
|
||||
if (metadata_snapshot->isPrimaryKeyDefined())
|
||||
{
|
||||
/// We don't use preparsed AST `sorting_key.expression_list_ast` because
|
||||
@ -54,15 +66,15 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
/// compatible way is just to convert definition_ast to list and
|
||||
/// serialize it. In all other places key.expression_list_ast should be
|
||||
/// used.
|
||||
sorting_key = formattedAST(extractKeyExpressionList(metadata_snapshot->getSortingKey().definition_ast));
|
||||
sorting_key = formattedASTNormalized(extractKeyExpressionList(metadata_snapshot->getSortingKey().definition_ast));
|
||||
}
|
||||
|
||||
data_format_version = data.format_version;
|
||||
|
||||
if (data.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
partition_key = formattedAST(metadata_snapshot->getPartitionKey().expression_list_ast);
|
||||
partition_key = formattedASTNormalized(metadata_snapshot->getPartitionKey().expression_list_ast);
|
||||
|
||||
ttl_table = formattedAST(metadata_snapshot->getTableTTLs().definition_ast);
|
||||
ttl_table = formattedASTNormalized(metadata_snapshot->getTableTTLs().definition_ast);
|
||||
|
||||
skip_indices = metadata_snapshot->getSecondaryIndices().toString();
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/FunctionNameNormalizer.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
|
||||
@ -565,9 +566,11 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
}
|
||||
|
||||
auto minmax_columns = metadata.getColumnsRequiredForPartitionKey();
|
||||
auto partition_key = metadata.partition_key.expression_list_ast->clone();
|
||||
FunctionNameNormalizer().visit(partition_key.get());
|
||||
auto primary_key_asts = metadata.primary_key.expression_list_ast->children;
|
||||
metadata.minmax_count_projection.emplace(ProjectionDescription::getMinMaxCountProjection(
|
||||
args.columns, metadata.partition_key.expression_list_ast, minmax_columns, primary_key_asts, args.getContext()));
|
||||
args.columns, partition_key, minmax_columns, primary_key_asts, args.getContext()));
|
||||
|
||||
if (args.storage_def->sample_by)
|
||||
metadata.sampling_key = KeyDescription::getKeyFromAST(args.storage_def->sample_by->ptr(), metadata.columns, args.getContext());
|
||||
@ -647,9 +650,11 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
++arg_num;
|
||||
|
||||
auto minmax_columns = metadata.getColumnsRequiredForPartitionKey();
|
||||
auto partition_key = metadata.partition_key.expression_list_ast->clone();
|
||||
FunctionNameNormalizer().visit(partition_key.get());
|
||||
auto primary_key_asts = metadata.primary_key.expression_list_ast->children;
|
||||
metadata.minmax_count_projection.emplace(ProjectionDescription::getMinMaxCountProjection(
|
||||
args.columns, metadata.partition_key.expression_list_ast, minmax_columns, primary_key_asts, args.getContext()));
|
||||
args.columns, partition_key, minmax_columns, primary_key_asts, args.getContext()));
|
||||
|
||||
const auto * ast = engine_args[arg_num]->as<ASTLiteral>();
|
||||
if (ast && ast->value.getType() == Field::Types::UInt64)
|
||||
|
@ -494,9 +494,18 @@ void StorageKeeperMap::drop()
|
||||
checkTable<true>();
|
||||
auto client = getClient();
|
||||
|
||||
client->remove(table_path);
|
||||
// we allow ZNONODE in case we got hardware error on previous drop
|
||||
if (auto code = client->tryRemove(table_path); code == Coordination::Error::ZNOTEMPTY)
|
||||
{
|
||||
throw zkutil::KeeperException(
|
||||
code, "{} contains children which shouldn't happen. Please DETACH the table if you want to delete it", table_path);
|
||||
}
|
||||
|
||||
if (!client->getChildren(tables_path).empty())
|
||||
std::vector<std::string> children;
|
||||
// if the tables_path is not found, some other table removed it
|
||||
// if there are children, some other tables are still using this path as storage
|
||||
if (auto code = client->tryGetChildren(tables_path, children);
|
||||
code != Coordination::Error::ZOK || !children.empty())
|
||||
return;
|
||||
|
||||
Coordination::Requests ops;
|
||||
|
@ -4185,6 +4185,11 @@ void StorageReplicatedMergeTree::startupImpl()
|
||||
|
||||
/// In this thread replica will be activated.
|
||||
restarting_thread.start();
|
||||
/// And this is just a callback
|
||||
session_expired_callback_handler = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [this]()
|
||||
{
|
||||
restarting_thread.start();
|
||||
});
|
||||
|
||||
/// Wait while restarting_thread finishing initialization.
|
||||
/// NOTE It does not mean that replication is actually started after receiving this event.
|
||||
@ -4228,6 +4233,8 @@ void StorageReplicatedMergeTree::shutdown()
|
||||
if (shutdown_called.exchange(true))
|
||||
return;
|
||||
|
||||
session_expired_callback_handler.reset();
|
||||
|
||||
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
|
||||
fetcher.blocker.cancelForever();
|
||||
merger_mutator.merges_blocker.cancelForever();
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Common/EventNotifier.h>
|
||||
#include <base/defines.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
@ -453,6 +454,7 @@ private:
|
||||
|
||||
/// A thread that processes reconnection to ZooKeeper when the session expires.
|
||||
ReplicatedMergeTreeRestartingThread restarting_thread;
|
||||
EventNotifier::HandlerPtr session_expired_callback_handler;
|
||||
|
||||
/// A thread that attaches the table using ZooKeeper
|
||||
std::optional<ReplicatedMergeTreeAttachThread> attach_thread;
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Storages/getVirtualsForStorage.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
@ -767,33 +768,28 @@ private:
|
||||
|
||||
|
||||
StorageS3::StorageS3(
|
||||
const S3::URI & uri_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
const S3Settings::ReadWriteSettings & rw_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const String & compression_method_,
|
||||
bool distributed_processing_,
|
||||
ASTPtr partition_by_)
|
||||
: IStorage(table_id_)
|
||||
, s3_configuration{uri_, access_key_id_, secret_access_key_, {}, {}, rw_settings_} /// Client and settings will be updated later
|
||||
, keys({uri_.key})
|
||||
, format_name(format_name_)
|
||||
, compression_method(compression_method_)
|
||||
, name(uri_.storage_name)
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, keys({s3_configuration.uri.key})
|
||||
, format_name(configuration_.format)
|
||||
, compression_method(configuration_.compression_method)
|
||||
, name(s3_configuration.uri.storage_name)
|
||||
, distributed_processing(distributed_processing_)
|
||||
, format_settings(format_settings_)
|
||||
, partition_by(partition_by_)
|
||||
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
|
||||
, is_key_with_globs(s3_configuration.uri.key.find_first_of("*?{") != std::string::npos)
|
||||
{
|
||||
FormatFactory::instance().checkFormatName(format_name);
|
||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
|
||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.uri.uri);
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
|
||||
updateS3Configuration(context_, s3_configuration);
|
||||
@ -1062,47 +1058,48 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
|
||||
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
|
||||
{
|
||||
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
|
||||
const auto & config_rw_settings = settings.rw_settings;
|
||||
|
||||
bool need_update_configuration = settings != S3Settings{};
|
||||
if (need_update_configuration)
|
||||
{
|
||||
if (upd.rw_settings != settings.rw_settings)
|
||||
upd.rw_settings = settings.rw_settings;
|
||||
}
|
||||
if (upd.rw_settings != config_rw_settings)
|
||||
upd.rw_settings = settings.rw_settings;
|
||||
|
||||
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
|
||||
|
||||
if (upd.client && (!upd.access_key_id.empty() || settings.auth_settings == upd.auth_settings))
|
||||
return;
|
||||
|
||||
Aws::Auth::AWSCredentials credentials(upd.access_key_id, upd.secret_access_key);
|
||||
HeaderCollection headers;
|
||||
if (upd.access_key_id.empty())
|
||||
if (upd.client)
|
||||
{
|
||||
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key);
|
||||
headers = settings.auth_settings.headers;
|
||||
if (upd.static_configuration)
|
||||
return;
|
||||
|
||||
if (settings.auth_settings == upd.auth_settings)
|
||||
return;
|
||||
}
|
||||
|
||||
upd.auth_settings.updateFrom(settings.auth_settings);
|
||||
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
settings.auth_settings.region,
|
||||
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
|
||||
upd.auth_settings.region,
|
||||
ctx->getRemoteHostFilter(),
|
||||
ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
|
||||
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false);
|
||||
|
||||
client_configuration.endpointOverride = upd.uri.endpoint;
|
||||
client_configuration.maxConnections = upd.rw_settings.max_connections;
|
||||
|
||||
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
|
||||
auto headers = upd.auth_settings.headers;
|
||||
if (!upd.headers_from_ast.empty())
|
||||
headers.insert(headers.end(), upd.headers_from_ast.begin(), upd.headers_from_ast.end());
|
||||
|
||||
upd.client = S3::ClientFactory::instance().create(
|
||||
client_configuration,
|
||||
upd.uri.is_virtual_hosted_style,
|
||||
credentials.GetAWSAccessKeyId(),
|
||||
credentials.GetAWSSecretKey(),
|
||||
settings.auth_settings.server_side_encryption_customer_key_base64,
|
||||
upd.auth_settings.server_side_encryption_customer_key_base64,
|
||||
std::move(headers),
|
||||
settings.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
|
||||
settings.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
|
||||
|
||||
upd.auth_settings = std::move(settings.auth_settings);
|
||||
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
|
||||
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
|
||||
}
|
||||
|
||||
|
||||
@ -1155,6 +1152,10 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
|
||||
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
auto header_it = StorageURL::collectHeaders(engine_args, configuration, local_context);
|
||||
if (header_it != engine_args.end())
|
||||
engine_args.erase(header_it);
|
||||
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
|
||||
|
||||
@ -1184,19 +1185,23 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
|
||||
}
|
||||
|
||||
ColumnsDescription StorageS3::getTableStructureFromData(
|
||||
const String & format,
|
||||
const S3::URI & uri,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & compression_method,
|
||||
const StorageS3Configuration & configuration,
|
||||
bool distributed_processing,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos)
|
||||
{
|
||||
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) };
|
||||
S3Configuration s3_configuration{
|
||||
configuration.url,
|
||||
configuration.auth_settings,
|
||||
S3Settings::ReadWriteSettings(ctx->getSettingsRef()),
|
||||
configuration.headers};
|
||||
|
||||
updateS3Configuration(ctx, s3_configuration);
|
||||
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
|
||||
|
||||
return getTableStructureFromDataImpl(
|
||||
configuration.format, s3_configuration, configuration.compression_method, distributed_processing,
|
||||
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
@ -1308,25 +1313,18 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
|
||||
format_settings = getFormatSettings(args.getContext());
|
||||
}
|
||||
|
||||
S3::URI s3_uri(Poco::URI(configuration.url));
|
||||
|
||||
ASTPtr partition_by;
|
||||
if (args.storage_def->partition_by)
|
||||
partition_by = args.storage_def->partition_by->clone();
|
||||
|
||||
return std::make_shared<StorageS3>(
|
||||
s3_uri,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
args.table_id,
|
||||
configuration.format,
|
||||
configuration.rw_settings,
|
||||
args.columns,
|
||||
args.constraints,
|
||||
args.comment,
|
||||
args.getContext(),
|
||||
format_settings,
|
||||
configuration.compression_method,
|
||||
/* distributed_processing_ */false,
|
||||
partition_by);
|
||||
},
|
||||
|
@ -149,18 +149,13 @@ class StorageS3 : public IStorage, WithContext
|
||||
{
|
||||
public:
|
||||
StorageS3(
|
||||
const S3::URI & uri,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const StorageS3Configuration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
const String & format_name_,
|
||||
const S3Settings::ReadWriteSettings & rw_settings_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
const String & compression_method_ = "",
|
||||
bool distributed_processing_ = false,
|
||||
ASTPtr partition_by_ = nullptr);
|
||||
|
||||
@ -189,11 +184,7 @@ public:
|
||||
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(
|
||||
const String & format,
|
||||
const S3::URI & uri,
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & compression_method,
|
||||
const StorageS3Configuration & configuration,
|
||||
bool distributed_processing,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx,
|
||||
@ -204,11 +195,28 @@ public:
|
||||
struct S3Configuration
|
||||
{
|
||||
const S3::URI uri;
|
||||
const String access_key_id;
|
||||
const String secret_access_key;
|
||||
std::shared_ptr<const Aws::S3::S3Client> client;
|
||||
|
||||
S3Settings::AuthSettings auth_settings;
|
||||
S3Settings::ReadWriteSettings rw_settings;
|
||||
|
||||
/// If s3 configuration was passed from ast, then it is static.
|
||||
/// If from config - it can be changed with config reload.
|
||||
bool static_configuration = true;
|
||||
|
||||
/// Headers from ast is a part of static configuration.
|
||||
HeaderCollection headers_from_ast;
|
||||
|
||||
S3Configuration(
|
||||
const String & url_,
|
||||
const S3Settings::AuthSettings & auth_settings_,
|
||||
const S3Settings::ReadWriteSettings & rw_settings_,
|
||||
const HeaderCollection & headers_from_ast_)
|
||||
: uri(S3::URI(url_))
|
||||
, auth_settings(auth_settings_)
|
||||
, rw_settings(rw_settings_)
|
||||
, static_configuration(!auth_settings_.access_key_id.empty())
|
||||
, headers_from_ast(headers_from_ast_) {}
|
||||
};
|
||||
|
||||
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
|
||||
|
@ -46,22 +46,17 @@
|
||||
namespace DB
|
||||
{
|
||||
StorageS3Cluster::StorageS3Cluster(
|
||||
const String & filename_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
const StorageS3ClusterConfiguration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
String cluster_name_,
|
||||
const String & format_name_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
ContextPtr context_,
|
||||
const String & compression_method_)
|
||||
ContextPtr context_)
|
||||
: IStorage(table_id_)
|
||||
, s3_configuration{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, {}, {}, S3Settings::ReadWriteSettings(context_->getSettingsRef())}
|
||||
, filename(filename_)
|
||||
, cluster_name(cluster_name_)
|
||||
, format_name(format_name_)
|
||||
, compression_method(compression_method_)
|
||||
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
|
||||
, filename(configuration_.url)
|
||||
, cluster_name(configuration_.cluster_name)
|
||||
, format_name(configuration_.format)
|
||||
, compression_method(configuration_.compression_method)
|
||||
{
|
||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
|
@ -21,16 +21,11 @@ class StorageS3Cluster : public IStorage
|
||||
{
|
||||
public:
|
||||
StorageS3Cluster(
|
||||
const String & filename_,
|
||||
const String & access_key_id_,
|
||||
const String & secret_access_key_,
|
||||
const StorageS3ClusterConfiguration & configuration_,
|
||||
const StorageID & table_id_,
|
||||
String cluster_name_,
|
||||
const String & format_name_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
ContextPtr context_,
|
||||
const String & compression_method_);
|
||||
ContextPtr context_);
|
||||
|
||||
std::string getName() const override { return "S3Cluster"; }
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <vector>
|
||||
#include <base/types.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Storages/HeaderCollection.h>
|
||||
|
||||
namespace Poco::Util
|
||||
{
|
||||
@ -15,15 +16,6 @@ class AbstractConfiguration;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct HttpHeader
|
||||
{
|
||||
String name;
|
||||
String value;
|
||||
|
||||
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
|
||||
};
|
||||
|
||||
using HeaderCollection = std::vector<HttpHeader>;
|
||||
|
||||
struct Settings;
|
||||
|
||||
@ -50,6 +42,21 @@ struct S3Settings
|
||||
&& use_environment_credentials == other.use_environment_credentials
|
||||
&& use_insecure_imds_request == other.use_insecure_imds_request;
|
||||
}
|
||||
|
||||
void updateFrom(const AuthSettings & from)
|
||||
{
|
||||
/// Update with check for emptyness only parameters which
|
||||
/// can be passed not only from config, but via ast.
|
||||
|
||||
if (!from.access_key_id.empty())
|
||||
access_key_id = from.access_key_id;
|
||||
if (!from.secret_access_key.empty())
|
||||
secret_access_key = from.secret_access_key;
|
||||
|
||||
headers = from.headers;
|
||||
region = from.region;
|
||||
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
|
||||
}
|
||||
};
|
||||
|
||||
struct ReadWriteSettings
|
||||
@ -94,7 +101,6 @@ struct S3Settings
|
||||
class StorageS3Settings
|
||||
{
|
||||
public:
|
||||
StorageS3Settings() = default;
|
||||
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings);
|
||||
|
||||
S3Settings getSettings(const String & endpoint) const;
|
||||
|
@ -1018,7 +1018,7 @@ ASTs::iterator StorageURL::collectHeaders(
|
||||
if (arg_value.getType() != Field::Types::Which::String)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
|
||||
|
||||
configuration.headers.emplace_back(arg_name, arg_value);
|
||||
configuration.headers.emplace_back(arg_name, arg_value.safeGet<String>());
|
||||
}
|
||||
|
||||
headers_it = arg_it;
|
||||
@ -1096,10 +1096,9 @@ void registerStorageURL(StorageFactory & factory)
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
||||
for (const auto & [header, value] : configuration.headers)
|
||||
{
|
||||
auto value_literal = value.safeGet<String>();
|
||||
if (header == "Range")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||
headers.emplace_back(std::make_pair(header, value_literal));
|
||||
headers.emplace_back(header, value);
|
||||
}
|
||||
|
||||
ASTPtr partition_by;
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include "registerTableFunctions.h"
|
||||
#include <filesystem>
|
||||
@ -40,6 +41,10 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
|
||||
if (args.empty() || args.size() > 6)
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
|
||||
|
||||
auto header_it = StorageURL::collectHeaders(args, s3_configuration, context);
|
||||
if (header_it != args.end())
|
||||
args.erase(header_it);
|
||||
|
||||
for (auto & arg : args)
|
||||
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
|
||||
|
||||
@ -135,15 +140,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(
|
||||
configuration.format,
|
||||
S3::URI(Poco::URI(configuration.url)),
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration.compression_method,
|
||||
false,
|
||||
std::nullopt,
|
||||
context);
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
@ -161,19 +158,14 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
|
||||
columns = structure_hint;
|
||||
|
||||
StoragePtr storage = std::make_shared<StorageS3>(
|
||||
s3_uri,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
configuration.format,
|
||||
configuration.rw_settings,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
String{},
|
||||
context,
|
||||
/// No format_settings for table function S3
|
||||
std::nullopt,
|
||||
configuration.compression_method);
|
||||
std::nullopt);
|
||||
|
||||
storage->startup();
|
||||
|
||||
|
@ -82,19 +82,10 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
|
||||
|
||||
ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
return StorageS3::getTableStructureFromData(
|
||||
configuration.format,
|
||||
S3::URI(Poco::URI(configuration.url)),
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration.compression_method,
|
||||
false,
|
||||
std::nullopt,
|
||||
context);
|
||||
}
|
||||
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
}
|
||||
@ -104,46 +95,38 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
|
||||
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
StoragePtr storage;
|
||||
|
||||
ColumnsDescription columns;
|
||||
|
||||
if (configuration.structure != "auto")
|
||||
{
|
||||
columns = parseColumnsListFromString(configuration.structure, context);
|
||||
}
|
||||
else if (!structure_hint.empty())
|
||||
{
|
||||
columns = structure_hint;
|
||||
}
|
||||
|
||||
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
|
||||
{
|
||||
/// On worker node this filename won't contains globs
|
||||
Poco::URI uri (configuration.url);
|
||||
S3::URI s3_uri (uri);
|
||||
storage = std::make_shared<StorageS3>(
|
||||
s3_uri,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
configuration.format,
|
||||
configuration.rw_settings,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
String{},
|
||||
/* comment */String{},
|
||||
context,
|
||||
// No format_settings for S3Cluster
|
||||
std::nullopt,
|
||||
configuration.compression_method,
|
||||
/* format_settings */std::nullopt, /// No format_settings for S3Cluster
|
||||
/*distributed_processing=*/true);
|
||||
}
|
||||
else
|
||||
{
|
||||
storage = std::make_shared<StorageS3Cluster>(
|
||||
configuration.url,
|
||||
configuration.auth_settings.access_key_id,
|
||||
configuration.auth_settings.secret_access_key,
|
||||
configuration,
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
configuration.cluster_name, configuration.format,
|
||||
columns,
|
||||
ConstraintsDescription{},
|
||||
context,
|
||||
configuration.compression_method);
|
||||
context);
|
||||
}
|
||||
|
||||
storage->startup();
|
||||
|
@ -103,10 +103,9 @@ ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
||||
for (const auto & [header, value] : configuration.headers)
|
||||
{
|
||||
auto value_literal = value.safeGet<String>();
|
||||
if (header == "Range")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||
headers.emplace_back(std::make_pair(header, value_literal));
|
||||
headers.emplace_back(header, value);
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
@ -13,9 +13,7 @@
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
<main><disk>s3</disk></main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
|
@ -48,8 +48,8 @@
|
||||
<s3_cache>
|
||||
<type>cache</type>
|
||||
<disk>s3_disk</disk>
|
||||
<path>s3_disk_cache/</path>
|
||||
<max_size>22548578304</max_size>
|
||||
<path>s3_cache/</path>
|
||||
<max_size>2147483648</max_size>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
|
||||
</s3_cache>
|
||||
@ -57,8 +57,9 @@
|
||||
<type>cache</type>
|
||||
<disk>s3_disk_2</disk>
|
||||
<path>s3_cache_2/</path>
|
||||
<max_size>22548578304</max_size>
|
||||
<max_size>2Gi</max_size>
|
||||
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
|
||||
<max_file_segment_size>100Mi</max_file_segment_size>
|
||||
</s3_cache_2>
|
||||
<s3_cache_3>
|
||||
<type>cache</type>
|
||||
|
@ -25,19 +25,17 @@ def cluster():
|
||||
global uuids
|
||||
for i in range(3):
|
||||
node1.query(
|
||||
""" CREATE TABLE data{} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def';""".format(
|
||||
i
|
||||
)
|
||||
f"CREATE TABLE data{i} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def', min_bytes_for_wide_part=1;"
|
||||
)
|
||||
node1.query(
|
||||
"INSERT INTO data{} SELECT number FROM numbers(500000 * {})".format(
|
||||
i, i + 1
|
||||
|
||||
for _ in range(10):
|
||||
node1.query(
|
||||
f"INSERT INTO data{i} SELECT number FROM numbers(500000 * {i+1})"
|
||||
)
|
||||
)
|
||||
expected = node1.query("SELECT * FROM data{} ORDER BY id".format(i))
|
||||
expected = node1.query(f"SELECT * FROM data{i} ORDER BY id")
|
||||
|
||||
metadata_path = node1.query(
|
||||
"SELECT data_paths FROM system.tables WHERE name='data{}'".format(i)
|
||||
f"SELECT data_paths FROM system.tables WHERE name='data{i}'"
|
||||
)
|
||||
metadata_path = metadata_path[
|
||||
metadata_path.find("/") : metadata_path.rfind("/") + 1
|
||||
@ -84,7 +82,7 @@ def test_usage(cluster, node_name):
|
||||
result = node2.query("SELECT * FROM test{} settings max_threads=20".format(i))
|
||||
|
||||
result = node2.query("SELECT count() FROM test{}".format(i))
|
||||
assert int(result) == 500000 * (i + 1)
|
||||
assert int(result) == 5000000 * (i + 1)
|
||||
|
||||
result = node2.query(
|
||||
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
|
||||
@ -123,7 +121,7 @@ def test_incorrect_usage(cluster):
|
||||
)
|
||||
|
||||
result = node2.query("SELECT count() FROM test0")
|
||||
assert int(result) == 500000
|
||||
assert int(result) == 5000000
|
||||
|
||||
result = node2.query_and_get_error("ALTER TABLE test0 ADD COLUMN col1 Int32 first")
|
||||
assert "Table is read-only" in result
|
||||
@ -169,7 +167,7 @@ def test_cache(cluster, node_name):
|
||||
assert int(result) > 0
|
||||
|
||||
result = node2.query("SELECT count() FROM test{}".format(i))
|
||||
assert int(result) == 500000 * (i + 1)
|
||||
assert int(result) == 5000000 * (i + 1)
|
||||
|
||||
result = node2.query(
|
||||
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
|
||||
|
@ -3,6 +3,12 @@ import time
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
# FIXME Test is temporarily disabled due to flakyness
|
||||
# https://github.com/ClickHouse/ClickHouse/issues/39700
|
||||
|
||||
pytestmark = pytest.mark.skip
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.utility import generate_values, replace_config, SafeThread
|
||||
|
||||
|
@ -110,6 +110,10 @@ def started_cluster():
|
||||
main_configs=["configs/defaultS3.xml"],
|
||||
user_configs=["configs/s3_max_redirects.xml"],
|
||||
)
|
||||
cluster.add_instance(
|
||||
"s3_non_default",
|
||||
with_minio=True,
|
||||
)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
@ -1689,3 +1693,22 @@ def test_schema_inference_cache(started_cluster):
|
||||
|
||||
test("s3")
|
||||
test("url")
|
||||
|
||||
|
||||
def test_ast_auth_headers(started_cluster):
|
||||
bucket = started_cluster.minio_restricted_bucket
|
||||
instance = started_cluster.instances["s3_non_default"] # type: ClickHouseInstance
|
||||
filename = "test.csv"
|
||||
|
||||
result = instance.query_and_get_error(
|
||||
f"select count() from s3('http://resolver:8080/{bucket}/{filename}', 'CSV')"
|
||||
)
|
||||
|
||||
assert "Forbidden Error" in result
|
||||
assert "S3_ERROR" in result
|
||||
|
||||
result = instance.query(
|
||||
f"select * from s3('http://resolver:8080/{bucket}/{filename}', 'CSV', headers(Authorization=`Bearer TOKEN`))"
|
||||
)
|
||||
|
||||
assert result.strip() == "1\t2\t3"
|
||||
|
@ -15,6 +15,8 @@ INSERT INTO partslost_0 SELECT toString(number) AS x from system.numbers LIMIT 1
|
||||
|
||||
ALTER TABLE partslost_0 ADD INDEX idx x TYPE tokenbf_v1(285000, 3, 12345) GRANULARITY 3;
|
||||
|
||||
SET mutations_sync = 2;
|
||||
|
||||
ALTER TABLE partslost_0 MATERIALIZE INDEX idx;
|
||||
|
||||
-- In worst case doesn't check anything, but it's not flaky
|
||||
|
@ -1 +1,2 @@
|
||||
22548578304 1048576 104857600 1 0 0 0 s3_disk_cache/ 0
|
||||
2147483648 1048576 104857600 1 0 0 0 s3_cache/ 0
|
||||
2147483648 1048576 104857600 0 0 0 0 s3_cache_2/ 0
|
||||
|
@ -1,3 +1,4 @@
|
||||
-- Tags: no-fasttest
|
||||
|
||||
DESCRIBE FILESYSTEM CACHE 's3_cache';
|
||||
DESCRIBE FILESYSTEM CACHE 's3_cache_2';
|
||||
|
@ -1,3 +1,5 @@
|
||||
-- Tags: no-backward-compatibility-check
|
||||
|
||||
drop table if exists test_02381;
|
||||
create table test_02381(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b);
|
||||
insert into test_02381 select number, number * 10 from system.numbers limit 1000000;
|
||||
|
@ -0,0 +1,22 @@
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<console>true</console>
|
||||
</logger>
|
||||
|
||||
<tcp_port>9000</tcp_port>
|
||||
<allow_implicit_no_password>0</allow_implicit_no_password>
|
||||
<path>.</path>
|
||||
<mark_cache_size>0</mark_cache_size>
|
||||
<!-- Sources to read users, roles, access rights, profiles of settings, quotas. -->
|
||||
<user_directories>
|
||||
<users_xml>
|
||||
<!-- Path to configuration file with predefined users. -->
|
||||
<path>users.xml</path>
|
||||
</users_xml>
|
||||
<local_directory>
|
||||
<!-- Path to folder where users created by SQL commands are stored. -->
|
||||
<path>./</path>
|
||||
</local_directory>
|
||||
</user_directories>
|
||||
</clickhouse>
|
85
tests/queries/0_stateless/02422_allow_implicit_no_password.sh
Executable file
85
tests/queries/0_stateless/02422_allow_implicit_no_password.sh
Executable file
@ -0,0 +1,85 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-parallel, no-fasttest
|
||||
# Tag no-tsan: requires jemalloc to track small allocations
|
||||
# Tag no-asan: requires jemalloc to track small allocations
|
||||
# Tag no-ubsan: requires jemalloc to track small allocations
|
||||
# Tag no-msan: requires jemalloc to track small allocations
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
cp /etc/clickhouse-server/users.xml "$CURDIR"/users.xml
|
||||
sed -i 's/<password><\/password>/<password_sha256_hex>c64c5e4e53ea1a9f1427d2713b3a22bbebe8940bc807adaf654744b1568c70ab<\/password_sha256_hex>/g' "$CURDIR"/users.xml
|
||||
sed -i 's/<!-- <access_management>1<\/access_management> -->/<access_management>1<\/access_management>/g' "$CURDIR"/users.xml
|
||||
|
||||
server_opts=(
|
||||
"--config-file=$CURDIR/$(basename "${BASH_SOURCE[0]}" .sh).config.xml"
|
||||
"--"
|
||||
# to avoid multiple listen sockets (complexity for port discovering)
|
||||
"--listen_host=127.1"
|
||||
# we will discover the real port later.
|
||||
"--tcp_port=0"
|
||||
"--shutdown_wait_unfinished=0"
|
||||
)
|
||||
|
||||
CLICKHOUSE_WATCHDOG_ENABLE=0 $CLICKHOUSE_SERVER_BINARY "${server_opts[@]}" &> clickhouse-server.stderr &
|
||||
server_pid=$!
|
||||
|
||||
server_port=
|
||||
i=0 retries=300
|
||||
# wait until server will start to listen (max 30 seconds)
|
||||
while [[ -z $server_port ]] && [[ $i -lt $retries ]]; do
|
||||
server_port=$(lsof -n -a -P -i tcp -s tcp:LISTEN -p $server_pid 2>/dev/null | awk -F'[ :]' '/LISTEN/ { print $(NF-1) }')
|
||||
((++i))
|
||||
sleep 0.1
|
||||
if ! kill -0 $server_pid >& /dev/null; then
|
||||
echo "No server (pid $server_pid)"
|
||||
break
|
||||
fi
|
||||
done
|
||||
if [[ -z $server_port ]]; then
|
||||
echo "Cannot wait for LISTEN socket" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# wait for the server to start accepting tcp connections (max 30 seconds)
|
||||
i=0 retries=300
|
||||
while ! $CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" --format Null -q 'select 1' 2>/dev/null && [[ $i -lt $retries ]]; do
|
||||
sleep 0.1
|
||||
if ! kill -0 $server_pid >& /dev/null; then
|
||||
echo "No server (pid $server_pid)"
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
|
||||
if ! $CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" --format Null -q 'select 1'; then
|
||||
echo "Cannot wait until server will start accepting connections on <tcp_port>" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "DROP USER IF EXISTS u1_02422, u2_02422, u3_02422";
|
||||
|
||||
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "CREATE USER u1_02422" " -- { serverError 516 } --" &> /dev/null ;
|
||||
|
||||
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "CREATE USER u2_02422 IDENTIFIED WITH no_password "
|
||||
|
||||
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "CREATE USER u3_02422 IDENTIFIED BY 'qwe123'";
|
||||
|
||||
$CLICKHOUSE_CLIENT_BINARY -u default --password='1w2swhb1' --host 127.1 --port "$server_port" -q "DROP USER u2_02422, u3_02422";
|
||||
|
||||
|
||||
# no sleep, since flushing to stderr should not be buffered.
|
||||
grep 'User is not allowed to Create users' clickhouse-server.stderr
|
||||
|
||||
|
||||
# send TERM and save the error code to ensure that it is 0 (EXIT_SUCCESS)
|
||||
kill $server_pid
|
||||
wait $server_pid
|
||||
return_code=$?
|
||||
|
||||
rm -f clickhouse-server.stderr
|
||||
rm -f "$CURDIR"/users.xml
|
||||
|
||||
exit $return_code
|
@ -0,0 +1,3 @@
|
||||
insert into function file(02453_data.jsonl, TSV) select 1 settings engine_file_truncate_on_insert=1;
|
||||
select * from file(02453_data.jsonl, auto, 'x UInt32') settings input_format_allow_errors_num=1, input_format_record_errors_file_path='../error_file'; -- {serverError DATABASE_ACCESS_DENIED}
|
||||
|
@ -1,3 +1,5 @@
|
||||
v22.9.2.7-stable 2022-09-23
|
||||
v22.9.1.2603-stable 2022-09-22
|
||||
v22.8.5.29-lts 2022-09-13
|
||||
v22.8.4.7-lts 2022-08-31
|
||||
v22.8.3.13-lts 2022-08-29
|
||||
|
|
Loading…
Reference in New Issue
Block a user