Merge branch 'master' into revert_SingleValueDataString

This commit is contained in:
Raúl Marín 2022-11-21 15:11:13 +01:00 committed by GitHub
commit 35e9e169a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
88 changed files with 2344 additions and 542 deletions

File diff suppressed because it is too large Load Diff

View File

@ -38,7 +38,7 @@ jobs:
with:
ref: master
fetch-depth: 0
- name: Generate versions
- name: Update versions, docker version, changelog, security
env:
GITHUB_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
run: |
@ -51,6 +51,7 @@ jobs:
--gh-user-or-token="$GITHUB_TOKEN" --jobs=5 \
--output="/ClickHouse/docs/changelogs/${GITHUB_TAG}.md" "${GITHUB_TAG}"
git add "./docs/changelogs/${GITHUB_TAG}.md"
python ./utils/security-generator/generate_security.py > SECURITY.md
git diff HEAD
- name: Create Pull Request
uses: peter-evans/create-pull-request@v3

View File

@ -1,3 +1,6 @@
<!--
the file is autogenerated by utils/security-generator/generate_security.py
-->
# Security Policy
@ -62,5 +65,5 @@ As the security issue moves from triage, to identified fix, to release planning
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.

View File

@ -254,7 +254,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
start
./stress --hung-check --drop-databases --output-folder test_output --skip-func-tests "$SKIP_TESTS_OPTION" \
./stress --hung-check --drop-databases --output-folder test_output --skip-func-tests "$SKIP_TESTS_OPTION" --global-time-limit 1200 \
&& echo -e 'Test script exit code\tOK' >> /test_output/test_results.tsv \
|| echo -e 'Test script failed\tFAIL' >> /test_output/test_results.tsv

View File

@ -0,0 +1,31 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.8.9.24-lts (a1b69551d40) FIXME as compared to v22.8.8.3-lts (ac5a6cababc)
#### Performance Improvement
* Backported in [#43012](https://github.com/ClickHouse/ClickHouse/issues/43012): Keeper performance improvement: improve commit performance for cases when many different nodes have uncommitted states. This should help with cases when a follower node can't sync fast enough. [#42926](https://github.com/ClickHouse/ClickHouse/pull/42926) ([Antonio Andelic](https://github.com/antonio2368)).
#### Improvement
* Backported in [#42840](https://github.com/ClickHouse/ClickHouse/issues/42840): Update tzdata to 2022f. Mexico will no longer observe DST except near the US border: https://www.timeanddate.com/news/time/mexico-abolishes-dst-2022.html. Chihuahua moves to year-round UTC-6 on 2022-10-30. Fiji no longer observes DST. See https://github.com/google/cctz/pull/235 and https://bugs.launchpad.net/ubuntu/+source/tzdata/+bug/1995209. [#42796](https://github.com/ClickHouse/ClickHouse/pull/42796) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### Build/Testing/Packaging Improvement
* Backported in [#42964](https://github.com/ClickHouse/ClickHouse/issues/42964): Before the fix, the user-defined config was preserved by RPM in `$file.rpmsave`. The PR fixes it and won't replace the user's files from packages. [#42936](https://github.com/ClickHouse/ClickHouse/pull/42936) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Backported in [#43040](https://github.com/ClickHouse/ClickHouse/issues/43040): Add a CI step to mark commits as ready for release; soft-forbid launching a release script from branches but master. [#43017](https://github.com/ClickHouse/ClickHouse/pull/43017) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#42720](https://github.com/ClickHouse/ClickHouse/issues/42720): Fixed `Unknown identifier (aggregate-function)` exception which appears when a user tries to calculate WINDOW ORDER BY/PARTITION BY expressions over aggregate functions: ``` CREATE TABLE default.tenk1 ( `unique1` Int32, `unique2` Int32, `ten` Int32 ) ENGINE = MergeTree ORDER BY tuple() SETTINGS index_granularity = 8192; SELECT ten, sum(unique1) + sum(unique2) AS res, rank() OVER (ORDER BY sum(unique1) + sum(unique2) ASC) AS rank FROM _complex GROUP BY ten ORDER BY ten ASC; ``` which gives: ``` Code: 47. DB::Exception: Received from localhost:9000. DB::Exception: Unknown identifier: sum(unique1); there are columns: unique1, unique2, ten: While processing sum(unique1) + sum(unique2) ASC. (UNKNOWN_IDENTIFIER) ```. [#39762](https://github.com/ClickHouse/ClickHouse/pull/39762) ([Vladimir Chebotaryov](https://github.com/quickhouse)).
* Backported in [#42748](https://github.com/ClickHouse/ClickHouse/issues/42748): A segmentation fault related to DNS & c-ares has been reported. The below error ocurred in multiple threads: ``` 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008088 [ 356 ] {} <Fatal> BaseDaemon: ######################################## 2022-09-28 15:41:19.008,"2022.09.28 15:41:19.008147 [ 356 ] {} <Fatal> BaseDaemon: (version 22.8.5.29 (official build), build id: 92504ACA0B8E2267) (from thread 353) (no query) Received signal Segmentation fault (11)" 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008196 [ 356 ] {} <Fatal> BaseDaemon: Address: 0xf Access: write. Address not mapped to object. 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008216 [ 356 ] {} <Fatal> BaseDaemon: Stack trace: 0x188f8212 0x1626851b 0x1626a69e 0x16269b3f 0x16267eab 0x13cf8284 0x13d24afc 0x13c5217e 0x14ec2495 0x15ba440f 0x15b9d13b 0x15bb2699 0x1891ccb3 0x1891e00d 0x18ae0769 0x18ade022 0x7f76aa985609 0x7f76aa8aa133 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008274 [ 356 ] {} <Fatal> BaseDaemon: 2. Poco::Net::IPAddress::family() const @ 0x188f8212 in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008297 [ 356 ] {} <Fatal> BaseDaemon: 3. ? @ 0x1626851b in /usr/bin/clickhouse 2022-09-28 15:41:19.008,2022.09.28 15:41:19.008309 [ 356 ] {} <Fatal> BaseDaemon: 4. ? @ 0x1626a69e in /usr/bin/clickhouse ```. [#42234](https://github.com/ClickHouse/ClickHouse/pull/42234) ([Arthur Passos](https://github.com/arthurpassos)).
* Backported in [#43062](https://github.com/ClickHouse/ClickHouse/issues/43062): Fix rare NOT_FOUND_COLUMN_IN_BLOCK error when projection is possible to use but there is no projection available. This fixes [#42771](https://github.com/ClickHouse/ClickHouse/issues/42771) . The bug was introduced in https://github.com/ClickHouse/ClickHouse/pull/25563. [#42938](https://github.com/ClickHouse/ClickHouse/pull/42938) ([Amos Bird](https://github.com/amosbird)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Do not warn about kvm-clock [#41217](https://github.com/ClickHouse/ClickHouse/pull/41217) ([Sergei Trifonov](https://github.com/serxa)).
* Revert revert 41268 disable s3 parallel write for part moves to disk s3 [#42617](https://github.com/ClickHouse/ClickHouse/pull/42617) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Always run `BuilderReport` and `BuilderSpecialReport` in all CI types [#42684](https://github.com/ClickHouse/ClickHouse/pull/42684) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -57,7 +57,7 @@ Internal coordination settings are located in the `<keeper_server>.<coordination
- `auto_forwarding` — Allow to forward write requests from followers to the leader (default: true).
- `shutdown_timeout` — Wait to finish internal connections and shutdown (ms) (default: 5000).
- `startup_timeout` — If the server doesn't connect to other quorum participants in the specified timeout it will terminate (ms) (default: 30000).
- `four_letter_word_white_list` — White list of 4lw commands (default: `conf,cons,crst,envi,ruok,srst,srvr,stat,wchc,wchs,dirs,mntr,isro`).
- `four_letter_word_white_list` — White list of 4lw commands (default: `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld`).
Quorum configuration is located in the `<keeper_server>.<raft_configuration>` section and contain servers description.
@ -126,7 +126,7 @@ clickhouse keeper --config /etc/your_path_to_config/config.xml
ClickHouse Keeper also provides 4lw commands which are almost the same with Zookeeper. Each command is composed of four letters such as `mntr`, `stat` etc. There are some more interesting commands: `stat` gives some general information about the server and connected clients, while `srvr` and `cons` give extended details on server and connections respectively.
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif`.
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld`.
You can issue the commands to ClickHouse Keeper via telnet or nc, at the client port.
@ -328,6 +328,12 @@ target_committed_log_idx 101
last_snapshot_idx 50
```
- `rqld`: Request to become new leader. Return `Sent leadership request to leader.` if request sent or `Failed to send leadership request to leader.` if request not sent. Note that if node is already leader the outcome is same as the request is sent.
```
Sent leadership request to leader.
```
## Migration from ZooKeeper {#migration-from-zookeeper}
Seamlessly migration from ZooKeeper to ClickHouse Keeper is impossible you have to stop your ZooKeeper cluster, convert data and start ClickHouse Keeper. `clickhouse-keeper-converter` tool allows converting ZooKeeper logs and snapshots to ClickHouse Keeper snapshot. It works only with ZooKeeper > 3.4. Steps for migration:

View File

@ -189,10 +189,12 @@ preAllocSize=131072
# especially if there are a lot of clients. To prevent ZooKeeper from running
# out of memory due to queued requests, ZooKeeper will throttle clients so that
# there is no more than globalOutstandingLimit outstanding requests in the
# system. The default limit is 1,000.ZooKeeper logs transactions to a
# transaction log. After snapCount transactions are written to a log file a
# snapshot is started and a new transaction log file is started. The default
# snapCount is 10,000.
# system. The default limit is 1000.
# globalOutstandingLimit=1000
# ZooKeeper logs transactions to a transaction log. After snapCount transactions
# are written to a log file a snapshot is started and a new transaction log file
# is started. The default snapCount is 100000.
snapCount=3000000
# If this option is defined, requests will be will logged to a trace file named

View File

@ -160,7 +160,7 @@ void ClusterCopierApp::mainImpl()
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
static const std::string default_database = "_local";

View File

@ -176,7 +176,7 @@ int DisksApp::main(const std::vector<String> & /*args*/)
Poco::Logger::root().setLevel(Poco::Logger::parseLevel(log_level));
}
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
shared_context = Context::createShared();

View File

@ -413,7 +413,7 @@ try
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
processConfig();

View File

@ -679,7 +679,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ false);
registerFormats();
registerRemoteFileMetadatas();

View File

@ -135,6 +135,9 @@ void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_
if (source_size < 2)
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
if (uncompressed_size == 0)
return;
UInt8 bytes_size = source[0];
if (bytes_size == 0)

View File

@ -36,7 +36,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
}
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif";
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST)

View File

@ -142,6 +142,9 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr log_info_command = std::make_shared<LogInfoCommand>(keeper_dispatcher);
factory.registerCommand(log_info_command);
FourLetterCommandPtr request_leader_command = std::make_shared<RequestLeaderCommand>(keeper_dispatcher);
factory.registerCommand(request_leader_command);
factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true);
}
@ -507,4 +510,9 @@ String LogInfoCommand::run()
return ret.str();
}
String RequestLeaderCommand::run()
{
return keeper_dispatcher.requestLeader() ? "Sent leadership request to leader." : "Failed to send leadership request to leader.";
}
}

View File

@ -364,4 +364,17 @@ struct LogInfoCommand : public IFourLetterCommand
~LogInfoCommand() override = default;
};
/// Request to be leader.
struct RequestLeaderCommand : public IFourLetterCommand
{
explicit RequestLeaderCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "rqld"; }
String run() override;
~RequestLeaderCommand() override = default;
};
}

View File

@ -215,6 +215,12 @@ public:
{
return server->getKeeperLogInfo();
}
/// Request to be leader.
bool requestLeader()
{
return server->requestLeader();
}
};
}

View File

@ -932,4 +932,9 @@ KeeperLogInfo KeeperServer::getKeeperLogInfo()
return log_info;
}
bool KeeperServer::requestLeader()
{
return isLeader() || raft_instance->request_leadership();
}
}

View File

@ -135,6 +135,8 @@ public:
uint64_t createSnapshot();
KeeperLogInfo getKeeperLogInfo();
bool requestLeader();
};
}

View File

@ -4,7 +4,10 @@
namespace DB
{
DiskDecorator::DiskDecorator(const DiskPtr & delegate_) : delegate(delegate_)
DiskDecorator::DiskDecorator(const DiskPtr & delegate_)
: IDisk(/* name_= */ "<decorator>")
, delegate(delegate_)
{
}
@ -226,9 +229,9 @@ void DiskDecorator::shutdown()
delegate->shutdown();
}
void DiskDecorator::startup(ContextPtr context)
void DiskDecorator::startupImpl(ContextPtr context)
{
delegate->startup(context);
delegate->startupImpl(context);
}
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map)

View File

@ -74,12 +74,14 @@ public:
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DataSourceDescription getDataSourceDescription() const override { return delegate->getDataSourceDescription(); }
bool isRemote() const override { return delegate->isRemote(); }
bool isReadOnly() const override { return delegate->isReadOnly(); }
bool isWriteOnce() const override { return delegate->isWriteOnce(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
bool supportParallelWrite() const override { return delegate->supportParallelWrite(); }
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;
void startup(ContextPtr context) override;
void startupImpl(ContextPtr context) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
bool supportsCache() const override { return delegate->supportsCache(); }

View File

@ -210,7 +210,7 @@ DiskEncrypted::DiskEncrypted(
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_)
: DiskDecorator(settings_->wrapped_disk)
, name(name_)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
@ -369,15 +369,19 @@ void DiskEncrypted::applyNewSettings(
current_settings.set(std::move(new_settings));
}
void registerDiskEncrypted(DiskFactory & factory)
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr /*context*/,
const DisksMap & map) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & map) -> DiskPtr
{
return std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("encrypted", creator);
}

View File

@ -33,7 +33,7 @@ public:
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_);
const String & getName() const override { return name; }
const String & getName() const override { return encrypted_name; }
const String & getPath() const override { return disk_absolute_path; }
ReservationPtr reserve(UInt64 bytes) override;
@ -261,7 +261,7 @@ private:
return disk_path + path;
}
const String name;
const String encrypted_name;
const String disk_path;
const String disk_absolute_path;
MultiVersion<DiskEncryptedSettings> current_settings;

View File

@ -500,7 +500,7 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi
}
DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_)
: IDisk(name_)
, disk_path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
@ -528,26 +528,6 @@ DataSourceDescription DiskLocal::getDataSourceDescription() const
return data_source_description;
}
void DiskLocal::startup(ContextPtr)
{
try
{
broken = false;
disk_checker_magic_number = -1;
disk_checker_can_check_read = true;
readonly = !setup();
}
catch (...)
{
tryLogCurrentException(logger, fmt::format("Disk {} is marked as broken during startup", name));
broken = true;
/// Disk checker is disabled when failing to start up.
disk_checker_can_check_read = false;
}
if (disk_checker && disk_checker_can_check_read)
disk_checker->startup();
}
void DiskLocal::shutdown()
{
if (disk_checker)
@ -641,18 +621,30 @@ DiskObjectStoragePtr DiskLocal::createDiskObjectStorage()
);
}
bool DiskLocal::setup()
void DiskLocal::checkAccessImpl(const String & path)
{
try
{
fs::create_directories(disk_path);
if (!FS::canWrite(disk_path))
{
LOG_ERROR(logger, "Cannot write to the root directory of disk {} ({}).", name, disk_path);
readonly = true;
return;
}
}
catch (...)
{
LOG_ERROR(logger, "Cannot create the directory of disk {} ({}).", name, disk_path);
throw;
LOG_ERROR(logger, "Cannot create the root directory of disk {} ({}).", name, disk_path);
readonly = true;
return;
}
IDisk::checkAccessImpl(path);
}
void DiskLocal::setup()
{
try
{
if (!FS::canRead(disk_path))
@ -666,7 +658,7 @@ bool DiskLocal::setup()
/// If disk checker is disabled, just assume RW by default.
if (!disk_checker)
return true;
return;
try
{
@ -690,6 +682,7 @@ bool DiskLocal::setup()
/// Try to create a new checker file. The disk status can be either broken or readonly.
if (disk_checker_magic_number == -1)
{
try
{
pcg32_fast rng(randomSeed());
@ -709,12 +702,33 @@ bool DiskLocal::setup()
disk_checker_path,
name);
disk_checker_can_check_read = false;
return true;
return;
}
}
if (disk_checker_magic_number == -1)
throw Exception("disk_checker_magic_number is not initialized. It's a bug", ErrorCodes::LOGICAL_ERROR);
return true;
}
void DiskLocal::startupImpl(ContextPtr)
{
broken = false;
disk_checker_magic_number = -1;
disk_checker_can_check_read = true;
try
{
setup();
}
catch (...)
{
tryLogCurrentException(logger, fmt::format("Disk {} is marked as broken during startup", name));
broken = true;
/// Disk checker is disabled when failing to start up.
disk_checker_can_check_read = false;
}
if (disk_checker && disk_checker_can_check_read)
disk_checker->startup();
}
struct stat DiskLocal::stat(const String & path) const
@ -741,13 +755,14 @@ MetadataStoragePtr DiskLocal::getMetadataStorage()
std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
void registerDiskLocal(DiskFactory & factory)
void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & map) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & map) -> DiskPtr
{
String path;
UInt64 keep_free_space_bytes;
@ -757,9 +772,10 @@ void registerDiskLocal(DiskFactory & factory)
if (path == disk_ptr->getPath())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
disk->startup(context);
disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(disk);
};
factory.registerDiskType("local", creator);

View File

@ -28,8 +28,6 @@ public:
ContextPtr context,
UInt64 local_disk_check_period_ms);
const String & getName() const override { return name; }
const String & getPath() const override { return disk_path; }
ReservationPtr reserve(UInt64 bytes) override;
@ -112,8 +110,9 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;
bool isBroken() const override { return broken; }
bool isReadOnly() const override { return readonly; }
void startup(ContextPtr) override;
void startupImpl(ContextPtr context) override;
void shutdown() override;
@ -133,17 +132,19 @@ public:
MetadataStoragePtr getMetadataStorage() override;
protected:
void checkAccessImpl(const String & path) override;
private:
std::optional<UInt64> tryReserve(UInt64 bytes);
/// Setup disk for healthy check. Returns true if it's read-write, false if read-only.
/// Setup disk for healthy check.
/// Throw exception if it's not possible to setup necessary files and directories.
bool setup();
void setup();
/// Read magic number from disk checker file. Return std::nullopt if exception happens.
std::optional<UInt32> readDiskCheckerMagicNumber() const noexcept;
const String name;
const String disk_path;
const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes;

View File

@ -141,6 +141,11 @@ private:
};
DiskMemory::DiskMemory(const String & name_)
: IDisk(name_)
, disk_path("memory(" + name_ + ')')
{}
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
{
throw Exception("Method reserve is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
@ -456,13 +461,20 @@ MetadataStoragePtr DiskMemory::getMetadataStorage()
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;
void registerDiskMemory(DiskFactory & factory)
void registerDiskMemory(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & /*config*/,
const String & /*config_prefix*/,
ContextPtr /*context*/,
const DisksMap & /*map*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskMemory>(name);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("memory", creator);
}

View File

@ -8,7 +8,7 @@
namespace DB
{
class DiskMemory;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
@ -22,9 +22,7 @@ class WriteBufferFromFileBase;
class DiskMemory : public IDisk
{
public:
explicit DiskMemory(const String & name_) : name(name_), disk_path("memory://" + name_ + '/') {}
const String & getName() const override { return name; }
explicit DiskMemory(const String & name_);
const String & getPath() const override { return disk_path; }
@ -121,7 +119,6 @@ private:
};
using Files = std::unordered_map<String, FileData>; /// file path -> file data
const String name;
const String disk_path;
Files files;
mutable std::mutex mutex;

View File

@ -79,7 +79,8 @@ private:
};
DiskRestartProxy::DiskRestartProxy(DiskPtr & delegate_)
: DiskDecorator(delegate_) { }
: DiskDecorator(delegate_)
{}
ReservationPtr DiskRestartProxy::reserve(UInt64 bytes)
{
@ -368,7 +369,8 @@ void DiskRestartProxy::restart(ContextPtr context)
LOG_INFO(log, "Restart lock acquired. Restarting disk {}", DiskDecorator::getName());
DiskDecorator::startup(context);
/// NOTE: access checking will cause deadlock here, so skip it.
DiskDecorator::startup(context, /* skip_access_check= */ true);
LOG_INFO(log, "Disk restarted {}", DiskDecorator::getName());
}

View File

@ -6,6 +6,7 @@
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Core/ServerUUID.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/FakeMetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
@ -17,6 +18,8 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_READ_ALL_DATA;
extern const int LOGICAL_ERROR;
}
bool IDisk::isDirectoryEmpty(const String & path) const
@ -126,4 +129,87 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
void IDisk::startup(ContextPtr context, bool skip_access_check)
{
if (!skip_access_check)
{
if (isReadOnly())
{
LOG_DEBUG(&Poco::Logger::get("IDisk"),
"Skip access check for disk {} (read-only disk).",
getName());
}
else
checkAccess();
}
startupImpl(context);
}
void IDisk::checkAccess()
{
DB::UUID server_uuid = DB::ServerUUID::get();
if (server_uuid == DB::UUIDHelpers::Nil)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
const String path = fmt::format("clickhouse_access_check_{}", DB::toString(server_uuid));
checkAccessImpl(path);
}
/// NOTE: should we mark the disk readonly if the write/unlink fails instead of throws?
void IDisk::checkAccessImpl(const String & path)
try
{
const std::string_view payload("test", 4);
/// write
{
auto file = writeFile(path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
try
{
file->write(payload.data(), payload.size());
}
catch (...)
{
/// Log current exception, because finalize() can throw a different exception.
tryLogCurrentException(__PRETTY_FUNCTION__);
file->finalize();
throw;
}
}
/// read
{
auto file = readFile(path);
String buf(payload.size(), '0');
file->readStrict(buf.data(), buf.size());
if (buf != payload)
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Content of {}::{} does not matches after read ({} vs {})", name, path, buf, payload);
}
}
/// read with offset
{
auto file = readFile(path);
auto offset = 2;
String buf(payload.size() - offset, '0');
file->seek(offset, 0);
file->readStrict(buf.data(), buf.size());
if (buf != payload.substr(offset))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Content of {}::{} does not matches after read with offset ({} vs {})", name, path, buf, payload.substr(offset));
}
}
/// remove
removeFile(path);
}
catch (Exception & e)
{
e.addMessage(fmt::format("While checking access for disk {}", name));
throw;
}
}

View File

@ -107,8 +107,9 @@ class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(std::shared_ptr<Executor> executor_ = std::make_shared<SyncExecutor>())
: executor(executor_)
explicit IDisk(const String & name_, std::shared_ptr<Executor> executor_ = std::make_shared<SyncExecutor>())
: name(name_)
, executor(executor_)
{
}
@ -121,6 +122,9 @@ public:
/// It's not required to be a local filesystem path.
virtual const String & getPath() const = 0;
/// Return disk name.
const String & getName() const override { return name; }
/// Total available space on the disk.
virtual UInt64 getTotalSpace() const = 0;
@ -308,14 +312,19 @@ public:
virtual bool isReadOnly() const { return false; }
virtual bool isWriteOnce() const { return false; }
/// Check if disk is broken. Broken disks will have 0 space and cannot be used.
virtual bool isBroken() const { return false; }
/// Invoked when Global Context is shutdown.
virtual void shutdown() {}
/// Performs action on disk startup.
virtual void startup(ContextPtr) {}
/// Performs access check and custom action on disk startup.
void startup(ContextPtr context, bool skip_access_check);
/// Performs custom action on disk startup.
virtual void startupImpl(ContextPtr) {}
/// Return some uniq string for file, overrode for IDiskRemote
/// Required for distinguish different copies of the same part on remote disk
@ -398,6 +407,8 @@ public:
protected:
friend class DiskDecorator;
const String name;
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
@ -406,8 +417,13 @@ protected:
/// A derived class may override copy() to provide a faster implementation.
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir = true);
virtual void checkAccessImpl(const String & path);
private:
std::shared_ptr<Executor> executor;
/// Check access to the disk.
void checkAccess();
};
using Disks = std::vector<DiskPtr>;

View File

@ -17,55 +17,9 @@
namespace DB
{
namespace ErrorCodes
void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access_check)
{
extern const int PATH_ACCESS_DENIED;
}
namespace
{
constexpr char test_file[] = "test.txt";
constexpr char test_str[] = "test";
constexpr size_t test_str_size = 4;
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile(test_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write(test_str, test_str_size);
}
void checkReadAccess(IDisk & disk)
{
auto file = disk.readFile(test_file);
String buf(test_str_size, '0');
file->readStrict(buf.data(), test_str_size);
if (buf != test_str)
throw Exception("No read access to disk", ErrorCodes::PATH_ACCESS_DENIED);
}
void checkReadWithOffset(IDisk & disk)
{
auto file = disk.readFile(test_file);
auto offset = 2;
auto test_size = test_str_size - offset;
String buf(test_size, '0');
file->seek(offset, 0);
file->readStrict(buf.data(), test_size);
if (buf != test_str + offset)
throw Exception("Failed to read file with offset", ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk)
{
disk.removeFile(test_file);
}
}
void registerDiskAzureBlobStorage(DiskFactory & factory)
{
auto creator = [](
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
@ -94,15 +48,8 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
copy_thread_pool_size
);
if (!config.getBool(config_prefix + ".skip_access_check", false))
{
checkWriteAccess(*azure_blob_storage_disk);
checkReadAccess(*azure_blob_storage_disk);
checkReadWithOffset(*azure_blob_storage_disk);
checkRemoveAccess(*azure_blob_storage_disk);
}
azure_blob_storage_disk->startup(context);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
azure_blob_storage_disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(azure_blob_storage_disk);
};
@ -117,7 +64,7 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
namespace DB
{
void registerDiskAzureBlobStorage(DiskFactory &) {}
void registerDiskAzureBlobStorage(DiskFactory &, bool /* global_skip_access_check */) {}
}

View File

@ -101,6 +101,8 @@ public:
bool isReadOnly() const override { return object_storage->isReadOnly(); }
bool isWriteOnce() const override { return object_storage->isWriteOnce(); }
const std::string & getCacheConfigName() const { return cache_config_name; }
ObjectStoragePtr getWrappedObjectStorage() { return object_storage; }

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void registerDiskCache(DiskFactory & factory)
void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check */)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,

View File

@ -109,8 +109,7 @@ DiskObjectStorage::DiskObjectStorage(
ObjectStoragePtr object_storage_,
bool send_metadata_,
uint64_t thread_pool_size_)
: IDisk(getAsyncExecutor(log_name, thread_pool_size_))
, name(name_)
: IDisk(name_, getAsyncExecutor(log_name, thread_pool_size_))
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, metadata_storage(std::move(metadata_storage_))
@ -420,9 +419,8 @@ void DiskObjectStorage::shutdown()
LOG_INFO(log, "Disk {} shut down", name);
}
void DiskObjectStorage::startup(ContextPtr context)
void DiskObjectStorage::startupImpl(ContextPtr context)
{
LOG_INFO(log, "Starting up disk {}", name);
object_storage->startup();
@ -499,6 +497,11 @@ bool DiskObjectStorage::isReadOnly() const
return object_storage->isReadOnly();
}
bool DiskObjectStorage::isWriteOnce() const
{
return object_storage->isWriteOnce();
}
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
{
return std::make_shared<DiskObjectStorage>(

View File

@ -45,8 +45,6 @@ public:
bool supportParallelWrite() const override { return object_storage->supportParallelWrite(); }
const String & getName() const override { return name; }
const String & getPath() const override { return metadata_storage->getPath(); }
StoredObjects getStorageObjects(const String & local_path) const override;
@ -138,7 +136,7 @@ public:
void shutdown() override;
void startup(ContextPtr context) override;
void startupImpl(ContextPtr context) override;
ReservationPtr reserve(UInt64 bytes) override;
@ -177,6 +175,12 @@ public:
/// with static files, so only read-only operations are allowed for this storage.
bool isReadOnly() const override;
/// Is object write-once?
/// For example: S3PlainObjectStorage is write once, this means that it
/// does support BACKUP to this disk, but does not support INSERT into
/// MergeTree table on this disk.
bool isWriteOnce() const override;
/// Add a cache layer.
/// Example: DiskObjectStorage(S3ObjectStorage) -> DiskObjectStorage(CachedObjectStorage(S3ObjectStorage))
/// There can be any number of cache layers:
@ -206,7 +210,6 @@ private:
/// execution.
DiskTransactionPtr createObjectStorageTransaction();
const String name;
const String object_storage_root_path;
Poco::Logger * log;

View File

@ -14,13 +14,14 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void registerDiskHDFS(DiskFactory & factory)
void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context_,
const DisksMap & /*map*/) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String uri{config.getString(config_prefix + ".endpoint")};
checkHDFSURL(uri);
@ -31,19 +32,20 @@ void registerDiskHDFS(DiskFactory & factory)
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context_->getSettingsRef().hdfs_replication
context->getSettingsRef().hdfs_replication
);
/// FIXME Cache currently unsupported :(
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(uri, std::move(settings), config);
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context_);
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk_result = std::make_shared<DiskObjectStorage>(
DiskPtr disk = std::make_shared<DiskObjectStorage>(
name,
uri,
"DiskHDFS",
@ -51,8 +53,9 @@ void registerDiskHDFS(DiskFactory & factory)
std::move(hdfs_storage),
/* send_metadata = */ false,
copy_thread_pool_size);
disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(disk_result);
return std::make_shared<DiskRestartProxy>(disk);
};
factory.registerDiskType("hdfs", creator);

View File

@ -199,6 +199,7 @@ public:
virtual bool supportsCache() const { return false; }
virtual bool isReadOnly() const { return false; }
virtual bool isWriteOnce() const { return false; }
virtual bool supportParallelWrite() const { return false; }

View File

@ -216,6 +216,11 @@ public:
{
data_source_description.type = DataSourceType::S3_Plain;
}
/// Notes:
/// - supports BACKUP to this disk
/// - does not support INSERT into MergeTree table on this disk
bool isWriteOnce() const override { return true; }
};
}

View File

@ -22,6 +22,7 @@
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <Core/ServerUUID.h>
namespace DB
@ -30,90 +31,78 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PATH_ACCESS_DENIED;
extern const int LOGICAL_ERROR;
}
namespace
{
void checkWriteAccess(IDisk & disk)
class CheckAccess
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
try
{
file->write("test", 4);
}
catch (...)
{
/// Log current exception, because finalize() can throw a different exception.
tryLogCurrentException(__PRETTY_FUNCTION__);
file->finalize();
throw;
}
}
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl");
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk)
{
disk.removeFile("test_acl");
}
bool checkBatchRemoveIsMissing(S3ObjectStorage & storage, const String & key_with_trailing_slash)
{
StoredObject object(key_with_trailing_slash + "_test_remove_objects_capability");
try
{
auto file = storage.writeObject(object, WriteMode::Rewrite);
file->write("test", 4);
file->finalize();
}
catch (...)
public:
static bool checkBatchRemove(S3ObjectStorage & storage, const String & key_with_trailing_slash)
{
/// NOTE: key_with_trailing_slash is the disk prefix, it is required
/// because access is done via S3ObjectStorage not via IDisk interface
/// (since we don't have disk yet).
const String path = fmt::format("{}clickhouse_remove_objects_capability_{}", key_with_trailing_slash, getServerUUID());
StoredObject object(path);
try
{
storage.removeObject(object);
auto file = storage.writeObject(object, WriteMode::Rewrite);
file->write("test", 4);
file->finalize();
}
catch (...)
{
try
{
storage.removeObject(object);
}
catch (...)
{
}
return true; /// We don't have write access, therefore no information about batch remove.
}
return false; /// We don't have write access, therefore no information about batch remove.
}
try
{
/// Uses `DeleteObjects` request (batch delete).
storage.removeObjects({object});
return false;
}
catch (const Exception &)
{
try
{
storage.removeObject(object);
/// Uses `DeleteObjects` request (batch delete).
storage.removeObjects({object});
return true;
}
catch (...)
catch (const Exception &)
{
try
{
storage.removeObject(object);
}
catch (...)
{
}
return false;
}
return true;
}
}
private:
static String getServerUUID()
{
DB::UUID server_uuid = DB::ServerUUID::get();
if (server_uuid == DB::UUIDHelpers::Nil)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
return DB::toString(server_uuid);
}
};
}
void registerDiskS3(DiskFactory & factory)
void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
@ -144,12 +133,12 @@ void registerDiskS3(DiskFactory & factory)
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
}
bool skip_access_check = config.getBool(config_prefix + ".skip_access_check", false);
/// NOTE: should we still perform this check for clickhouse-disks?
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
if (!skip_access_check)
{
/// If `support_batch_delete` is turned on (default), check and possibly switch it off.
if (s3_capabilities.support_batch_delete && checkBatchRemoveIsMissing(*s3_storage, uri.key))
if (s3_capabilities.support_batch_delete && !CheckAccess::checkBatchRemove(*s3_storage, uri.key))
{
LOG_WARNING(
&Poco::Logger::get("registerDiskS3"),
@ -165,7 +154,7 @@ void registerDiskS3(DiskFactory & factory)
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
std::shared_ptr<DiskObjectStorage> s3disk = std::make_shared<DiskObjectStorage>(
DiskObjectStoragePtr s3disk = std::make_shared<DiskObjectStorage>(
name,
uri.key,
type == "s3" ? "DiskS3" : "DiskS3Plain",
@ -174,15 +163,7 @@ void registerDiskS3(DiskFactory & factory)
send_metadata,
copy_thread_pool_size);
/// This code is used only to check access to the corresponding disk.
if (!skip_access_check)
{
checkWriteAccess(*s3disk);
checkReadAccess(name, *s3disk);
checkRemoveAccess(*s3disk);
}
s3disk->startup(context);
s3disk->startup(context, skip_access_check);
std::shared_ptr<IDisk> disk_result = s3disk;
@ -196,6 +177,6 @@ void registerDiskS3(DiskFactory & factory)
#else
void registerDiskS3(DiskFactory &) {}
void registerDiskS3(DiskFactory &, bool /* global_skip_access_check */) {}
#endif

View File

@ -13,7 +13,6 @@ namespace DB
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int NETWORK_ERROR;
}
MetadataStorageFromStaticFilesWebServer::MetadataStorageFromStaticFilesWebServer(
@ -38,7 +37,7 @@ bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) c
if (fs_path.has_extension())
fs_path = fs_path.parent_path();
initializeIfNeeded(fs_path, false);
initializeIfNeeded(fs_path);
if (object_storage.files.empty())
return false;
@ -123,39 +122,21 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
return result;
}
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error) const
void MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path) const
{
if (object_storage.files.find(path) == object_storage.files.end())
{
try
{
object_storage.initialize(fs::path(object_storage.url) / path);
}
catch (...)
{
const auto message = getCurrentExceptionMessage(false);
bool can_throw = throw_on_error.has_value() ? *throw_on_error : CurrentThread::isInitialized() && CurrentThread::get().getQueryContext();
if (can_throw)
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Cannot load disk metadata. Error: {}", message);
return false;
}
object_storage.initialize(fs::path(object_storage.url) / path);
}
return true;
}
DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(const std::string & path) const
{
std::vector<fs::path> dir_file_paths;
if (!initializeIfNeeded(path))
{
initializeIfNeeded(path);
if (!exists(path))
return std::make_unique<StaticDirectoryIterator>(std::move(dir_file_paths));
}
assertExists(path);
for (const auto & [file_path, _] : object_storage.files)
{

View File

@ -19,7 +19,7 @@ private:
void assertExists(const std::string & path) const;
bool initializeIfNeeded(const std::string & path, std::optional<bool> throw_on_error = std::nullopt) const;
void initializeIfNeeded(const std::string & path) const;
public:
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);

View File

@ -46,7 +46,10 @@ void WebObjectStorage::initialize(const String & uri_path) const
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(getContext()),
credentials);
credentials,
/* max_redirects= */ 0,
/* buffer_size_= */ DBMS_DEFAULT_BUFFER_SIZE,
getContext()->getReadSettings());
String file_name;
FileData file_data{};
@ -82,6 +85,15 @@ void WebObjectStorage::initialize(const String & uri_path) const
files.emplace(std::make_pair(dir_name, FileData({ .type = FileType::Directory })));
}
catch (HTTPException & e)
{
/// 404 - no files
if (e.getHTTPStatus() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND)
return;
e.addMessage("while loading disk metadata");
throw;
}
catch (Exception & e)
{
e.addMessage("while loading disk metadata");

View File

@ -14,15 +14,17 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void registerDiskWebServer(DiskFactory & factory)
void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [](const String & disk_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
auto creator = [global_skip_access_check](
const String & disk_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String uri{config.getString(config_prefix + ".endpoint")};
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
if (!uri.ends_with('/'))
throw Exception(
@ -41,7 +43,7 @@ void registerDiskWebServer(DiskFactory & factory)
auto metadata_storage = std::make_shared<MetadataStorageFromStaticFilesWebServer>(assert_cast<const WebObjectStorage &>(*object_storage));
std::string root_path;
return std::make_shared<DiskObjectStorage>(
DiskPtr disk = std::make_shared<DiskObjectStorage>(
disk_name,
root_path,
"DiskWebServer",
@ -49,6 +51,8 @@ void registerDiskWebServer(DiskFactory & factory)
object_storage,
/* send_metadata */false,
/* threadpool_size */16);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("web", creator);

View File

@ -7,55 +7,55 @@
namespace DB
{
void registerDiskLocal(DiskFactory & factory);
void registerDiskMemory(DiskFactory & factory);
void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check);
void registerDiskMemory(DiskFactory & factory, bool global_skip_access_check);
#if USE_AWS_S3
void registerDiskS3(DiskFactory & factory);
void registerDiskS3(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_AZURE_BLOB_STORAGE
void registerDiskAzureBlobStorage(DiskFactory & factory);
void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_SSL
void registerDiskEncrypted(DiskFactory & factory);
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_HDFS
void registerDiskHDFS(DiskFactory & factory);
void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check);
#endif
void registerDiskWebServer(DiskFactory & factory);
void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check);
void registerDiskCache(DiskFactory & factory);
void registerDiskCache(DiskFactory & factory, bool global_skip_access_check);
void registerDisks()
void registerDisks(bool global_skip_access_check)
{
auto & factory = DiskFactory::instance();
registerDiskLocal(factory);
registerDiskMemory(factory);
registerDiskLocal(factory, global_skip_access_check);
registerDiskMemory(factory, global_skip_access_check);
#if USE_AWS_S3
registerDiskS3(factory);
registerDiskS3(factory, global_skip_access_check);
#endif
#if USE_AZURE_BLOB_STORAGE
registerDiskAzureBlobStorage(factory);
registerDiskAzureBlobStorage(factory, global_skip_access_check);
#endif
#if USE_SSL
registerDiskEncrypted(factory);
registerDiskEncrypted(factory, global_skip_access_check);
#endif
#if USE_HDFS
registerDiskHDFS(factory);
registerDiskHDFS(factory, global_skip_access_check);
#endif
registerDiskWebServer(factory);
registerDiskWebServer(factory, global_skip_access_check);
registerDiskCache(factory);
registerDiskCache(factory, global_skip_access_check);
}
}

View File

@ -2,5 +2,10 @@
namespace DB
{
void registerDisks();
/// @param global_skip_access_check - skip access check regardless regardless
/// .skip_access_check config directive (used
/// for clickhouse-disks)
void registerDisks(bool global_skip_access_check);
}

View File

@ -312,15 +312,29 @@ void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPR
|| status == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT /// Reading with Range header was successful.
|| (isRedirect(status) && allow_redirects)))
{
std::stringstream error_message; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
error_message.exceptions(std::ios::failbit);
error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " "
<< response.getReason() << ", body: " << istr.rdbuf();
int code = status == Poco::Net::HTTPResponse::HTTP_TOO_MANY_REQUESTS
? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
: ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
throw Exception(error_message.str(),
status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
: ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
std::stringstream body; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
body.exceptions(std::ios::failbit);
body << istr.rdbuf();
throw HTTPException(code, request.getURI(), status, response.getReason(), body.str());
}
}
std::string HTTPException::makeExceptionMessage(
const std::string & uri,
Poco::Net::HTTPResponse::HTTPStatus http_status,
const std::string & reason,
const std::string & body)
{
return fmt::format(
"Received error from remote server {}. "
"HTTP status code: {} {}, "
"body: {}",
uri, http_status, reason, body);
}
}

View File

@ -17,8 +17,6 @@
namespace DB
{
constexpr int HTTP_TOO_MANY_REQUESTS = 429;
class HTTPServerResponse;
class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
@ -35,6 +33,38 @@ public:
SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_);
};
class HTTPException : public Exception
{
public:
HTTPException(
int code,
const std::string & uri,
Poco::Net::HTTPResponse::HTTPStatus http_status_,
const std::string & reason,
const std::string & body
)
: Exception(makeExceptionMessage(uri, http_status_, reason, body), code)
, http_status(http_status_)
{}
HTTPException * clone() const override { return new HTTPException(*this); }
void rethrow() const override { throw *this; }
int getHTTPStatus() const { return http_status; }
private:
Poco::Net::HTTPResponse::HTTPStatus http_status{};
static std::string makeExceptionMessage(
const std::string & uri,
Poco::Net::HTTPResponse::HTTPStatus http_status,
const std::string & reason,
const std::string & body);
const char * name() const noexcept override { return "DB::HTTPException"; }
const char * className() const noexcept override { return "DB::HTTPException"; }
};
using PooledHTTPSessionPtr = SingleEndpointHTTPSessionPool::Entry;
using HTTPSessionPtr = std::shared_ptr<Poco::Net::HTTPClientSession>;

View File

@ -1829,9 +1829,22 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (storage && (query.sampleSize() || settings.parallel_replicas_count > 1))
{
Names columns_for_sampling = metadata_snapshot->getColumnsRequiredForSampling();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_sampling.begin(), columns_for_sampling.end());
// we evaluate sampling for Merge lazily so we need to get all the columns
if (storage->getName() == "Merge")
{
const auto columns = metadata_snapshot->getColumns().getAll();
for (const auto & column : columns)
{
additional_required_columns_after_prewhere.push_back(column.name);
}
}
else
{
Names columns_for_sampling = metadata_snapshot->getColumnsRequiredForSampling();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_sampling.begin(), columns_for_sampling.end());
}
}
if (storage && query.final())

View File

@ -33,7 +33,7 @@ try
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
registerDisks(/* global_skip_access_check= */ true);
registerFormats();
return true;

View File

@ -1094,7 +1094,153 @@ bool ParserCollectionOfLiterals<Collection>::parseImpl(Pos & pos, ASTPtr & node,
template bool ParserCollectionOfLiterals<Array>::parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
template bool ParserCollectionOfLiterals<Tuple>::parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
template bool ParserCollectionOfLiterals<Map>::parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
namespace
{
class ICollection;
using Collections = std::vector<std::unique_ptr<ICollection>>;
class ICollection
{
public:
virtual ~ICollection() = default;
virtual bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected) = 0;
};
template <class Container, TokenType end_token>
class CommonCollection : public ICollection
{
public:
bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected) override;
private:
Container container;
};
class MapCollection : public ICollection
{
public:
bool parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected) override;
private:
Map container;
};
bool parseAllCollectionsStart(IParser::Pos & pos, Collections & collections, Expected & /*expected*/)
{
if (pos->type == TokenType::OpeningCurlyBrace)
collections.push_back(std::make_unique<MapCollection>());
else if (pos->type == TokenType::OpeningRoundBracket)
collections.push_back(std::make_unique<CommonCollection<Tuple, TokenType::ClosingRoundBracket>>());
else if (pos->type == TokenType::OpeningSquareBracket)
collections.push_back(std::make_unique<CommonCollection<Array, TokenType::ClosingSquareBracket>>());
else
return false;
++pos;
return true;
}
template <class Container, TokenType end_token>
bool CommonCollection<Container, end_token>::parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected)
{
if (node)
{
container.push_back(std::move(node->as<ASTLiteral &>().value));
node.reset();
}
ASTPtr literal;
ParserLiteral literal_p;
ParserToken comma_p(TokenType::Comma);
ParserToken end_p(end_token);
while (true)
{
if (end_p.ignore(pos, expected))
{
node = std::make_shared<ASTLiteral>(std::move(container));
break;
}
if (!container.empty() && !comma_p.ignore(pos, expected))
return false;
if (literal_p.parse(pos, literal, expected))
container.push_back(std::move(literal->as<ASTLiteral &>().value));
else
return parseAllCollectionsStart(pos, collections, expected);
}
return true;
}
bool MapCollection::parse(IParser::Pos & pos, Collections & collections, ASTPtr & node, Expected & expected)
{
if (node)
{
container.push_back(std::move(node->as<ASTLiteral &>().value));
node.reset();
}
ASTPtr literal;
ParserLiteral literal_p;
ParserToken comma_p(TokenType::Comma);
ParserToken colon_p(TokenType::Colon);
ParserToken end_p(TokenType::ClosingCurlyBrace);
while (true)
{
if (end_p.ignore(pos, expected))
{
node = std::make_shared<ASTLiteral>(std::move(container));
break;
}
if (!container.empty() && !comma_p.ignore(pos, expected))
return false;
if (!literal_p.parse(pos, literal, expected))
return false;
if (!colon_p.parse(pos, literal, expected))
return false;
container.push_back(std::move(literal->as<ASTLiteral &>().value));
if (literal_p.parse(pos, literal, expected))
container.push_back(std::move(literal->as<ASTLiteral &>().value));
else
return parseAllCollectionsStart(pos, collections, expected);
}
return true;
}
}
bool ParserAllCollectionsOfLiterals::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
Collections collections;
if (!parseAllCollectionsStart(pos, collections, expected))
return false;
while (!collections.empty())
{
if (!collections.back()->parse(pos, collections, node, expected))
return false;
if (node)
collections.pop_back();
}
return true;
}
bool ParserLiteral::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{

View File

@ -301,6 +301,17 @@ protected:
}
};
/** Parses all collections of literals and their various combinations
* Used in parsing parameters for SET query
*/
class ParserAllCollectionsOfLiterals : public IParserBase
{
public:
protected:
const char * getName() const override { return "combination of maps, arrays, tuples"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** The literal is one of: NULL, UInt64, Int64, Float64, String.
*/

View File

@ -4,10 +4,13 @@
#include <Parsers/CommonParsers.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Core/Names.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <Common/FieldVisitorToString.h>
#include <Common/SettingsChanges.h>
#include <Common/typeid_cast.h>
@ -20,21 +23,75 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static NameToNameMap::value_type convertToQueryParameter(SettingChange change)
{
auto name = change.name.substr(strlen(QUERY_PARAMETER_NAME_PREFIX));
if (name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter name cannot be empty");
auto value = applyVisitor(FieldVisitorToString(), change.value);
/// writeQuoted is not always quoted in line with SQL standard https://github.com/ClickHouse/ClickHouse/blob/master/src/IO/WriteHelpers.h
if (value.starts_with('\''))
class ParameterFieldVisitorToString : public StaticVisitor<String>
{
public:
template <class T>
String operator() (const T & x) const
{
ReadBufferFromOwnString buf(value);
readQuoted(value, buf);
FieldVisitorToString visitor;
return visitor(x);
}
return {name, value};
}
String operator() (const Array & x) const
{
WriteBufferFromOwnString wb;
wb << '[';
for (Array::const_iterator it = x.begin(); it != x.end(); ++it)
{
if (it != x.begin())
wb.write(", ", 2);
wb << applyVisitor(*this, *it);
}
wb << ']';
return wb.str();
}
String operator() (const Map & x) const
{
WriteBufferFromOwnString wb;
wb << '{';
auto it = x.begin();
while (it != x.end())
{
if (it != x.begin())
wb << ", ";
wb << applyVisitor(*this, *it);
++it;
if (it != x.end())
{
wb << ':';
wb << applyVisitor(*this, *it);
++it;
}
}
wb << '}';
return wb.str();
}
String operator() (const Tuple & x) const
{
WriteBufferFromOwnString wb;
wb << '(';
for (auto it = x.begin(); it != x.end(); ++it)
{
if (it != x.begin())
wb << ", ";
wb << applyVisitor(*this, *it);
}
wb << ')';
return wb.str();
}
};
class ParserLiteralOrMap : public IParserBase
@ -89,6 +146,48 @@ protected:
}
};
/// Parse Identifier, Literal, Array/Tuple/Map of literals
bool parseParameterValueIntoString(IParser::Pos & pos, String & value, Expected & expected)
{
ASTPtr node;
/// 1. Identifier
ParserCompoundIdentifier identifier_p;
if (identifier_p.parse(pos, node, expected))
{
tryGetIdentifierNameInto(node, value);
return true;
}
/// 2. Literal
ParserLiteral literal_p;
if (literal_p.parse(pos, node, expected))
{
value = applyVisitor(FieldVisitorToString(), node->as<ASTLiteral>()->value);
/// writeQuoted is not always quoted in line with SQL standard https://github.com/ClickHouse/ClickHouse/blob/master/src/IO/WriteHelpers.h
if (value.starts_with('\''))
{
ReadBufferFromOwnString buf(value);
readQuoted(value, buf);
}
return true;
}
/// 3. Map, Array, Tuple of literals and their combination
ParserAllCollectionsOfLiterals all_collections_p;
if (all_collections_p.parse(pos, node, expected))
{
value = applyVisitor(ParameterFieldVisitorToString(), node->as<ASTLiteral>()->value);
return true;
}
return false;
}
/// Parse `name = value`.
bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & pos, Expected & expected)
{
@ -118,36 +217,58 @@ bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & p
return true;
}
bool ParserSetQuery::parseNameValuePairWithDefault(SettingChange & change, String & default_settings, IParser::Pos & pos, Expected & expected)
bool ParserSetQuery::parseNameValuePairWithParameterOrDefault(
SettingChange & change, String & default_settings, ParserSetQuery::Parameter & parameter, IParser::Pos & pos, Expected & expected)
{
ParserCompoundIdentifier name_p;
ParserLiteralOrMap value_p;
ParserToken s_eq(TokenType::Equals);
ASTPtr name;
ASTPtr value;
bool is_default = false;
ASTPtr node;
String name;
if (!name_p.parse(pos, name, expected))
if (!name_p.parse(pos, node, expected))
return false;
if (!s_eq.ignore(pos, expected))
return false;
tryGetIdentifierNameInto(node, name);
/// Parameter
if (name.starts_with(QUERY_PARAMETER_NAME_PREFIX))
{
name = name.substr(strlen(QUERY_PARAMETER_NAME_PREFIX));
if (name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter name cannot be empty");
String value;
if (!parseParameterValueIntoString(pos, value, expected))
return false;
parameter = {std::move(name), std::move(value)};
return true;
}
/// Default
if (ParserKeyword("DEFAULT").ignore(pos, expected))
{
default_settings = name;
return true;
}
/// Setting
if (ParserKeyword("TRUE").ignore(pos, expected))
value = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(1)));
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(1)));
else if (ParserKeyword("FALSE").ignore(pos, expected))
value = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
else if (ParserKeyword("DEFAULT").ignore(pos, expected))
is_default = true;
else if (!value_p.parse(pos, value, expected))
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
else if (!value_p.parse(pos, node, expected))
return false;
tryGetIdentifierNameInto(name, change.name);
if (is_default)
default_settings = change.name;
else
change.value = value->as<ASTLiteral &>().value;
change.name = name;
change.value = node->as<ASTLiteral &>().value;
return true;
}
@ -178,19 +299,19 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if ((!changes.empty() || !query_parameters.empty() || !default_settings.empty()) && !s_comma.ignore(pos))
break;
/// Either a setting or a parameter for prepared statement (if name starts with QUERY_PARAMETER_NAME_PREFIX)
SettingChange current;
SettingChange setting;
String name_of_default_setting;
Parameter parameter;
if (!parseNameValuePairWithDefault(current, name_of_default_setting, pos, expected))
if (!parseNameValuePairWithParameterOrDefault(setting, name_of_default_setting, parameter, pos, expected))
return false;
if (current.name.starts_with(QUERY_PARAMETER_NAME_PREFIX))
query_parameters.emplace(convertToQueryParameter(std::move(current)));
if (!parameter.first.empty())
query_parameters.emplace(std::move(parameter));
else if (!name_of_default_setting.empty())
default_settings.emplace_back(std::move(name_of_default_setting));
else
changes.push_back(std::move(current));
changes.push_back(std::move(setting));
}
auto query = std::make_shared<ASTSetQuery>();

View File

@ -15,9 +15,18 @@ struct SettingChange;
class ParserSetQuery : public IParserBase
{
public:
using Parameter = std::pair<std::string, std::string>;
explicit ParserSetQuery(bool parse_only_internals_ = false) : parse_only_internals(parse_only_internals_) {}
static bool parseNameValuePair(SettingChange & change, IParser::Pos & pos, Expected & expected);
static bool parseNameValuePairWithDefault(SettingChange & change, String & default_settings, IParser::Pos & pos, Expected & expected);
static bool parseNameValuePairWithParameterOrDefault(SettingChange & change,
String & default_settings,
Parameter & parameter,
IParser::Pos & pos,
Expected & expected);
protected:
const char * getName() const override { return "SET query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;

View File

@ -253,7 +253,7 @@ bool IStorage::isStaticStorage() const
if (storage_policy)
{
for (const auto & disk : storage_policy->getDisks())
if (!disk->isReadOnly())
if (!(disk->isReadOnly() || disk->isWriteOnce()))
return false;
return true;
}

View File

@ -583,7 +583,8 @@ public:
/// Returns storage policy if storage supports it.
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
/// Returns true if all disks of storage are read-only.
/// Returns true if all disks of storage are read-only or write-once.
/// NOTE: write-once also does not support INSERTs/merges/... for MergeTree
virtual bool isStaticStorage() const;
virtual bool supportsSubsetOfColumns() const { return false; }

View File

@ -282,8 +282,8 @@ MergeTreeData::MergeTreeData(
checkTTLExpressions(metadata_, metadata_);
/// format_file always contained on any data path
PathWithDisk version_file;
const auto format_version_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
std::optional<UInt32> read_format_version;
/// Creating directories, if not exist.
for (const auto & disk : getDisks())
{
@ -292,42 +292,44 @@ MergeTreeData::MergeTreeData(
disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
String current_version_file_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
if (disk->exists(current_version_file_path))
if (disk->exists(format_version_path))
{
if (!version_file.first.empty())
throw Exception(ErrorCodes::CORRUPTED_DATA, "Duplication of version file {} and {}", fullPath(version_file.second, version_file.first), current_version_file_path);
version_file = {current_version_file_path, disk};
auto buf = disk->readFile(format_version_path);
UInt32 current_format_version{0};
readIntText(current_format_version, *buf);
if (!buf->eof())
throw Exception(ErrorCodes::CORRUPTED_DATA, "Bad version file: {}", fullPath(disk, format_version_path));
if (!read_format_version.has_value())
read_format_version = current_format_version;
else if (*read_format_version != current_format_version)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Version file on {} contains version {} expected version is {}.", fullPath(disk, format_version_path), current_format_version, *read_format_version);
}
}
/// If not choose any
if (version_file.first.empty())
version_file = {fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME, getStoragePolicy()->getAnyDisk()};
bool version_file_exists = version_file.second->exists(version_file.first);
// When data path or file not exists, ignore the format_version check
if (!attach || !version_file_exists)
if (!attach || !read_format_version)
{
format_version = min_format_version;
if (!version_file.second->isReadOnly())
// try to write to first non-readonly disk
for (const auto & disk : getStoragePolicy()->getDisks())
{
auto buf = version_file.second->writeFile(version_file.first, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, context_->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
if (getContext()->getSettingsRef().fsync_metadata)
buf->sync();
if (!disk->isReadOnly())
{
auto buf = disk->writeFile(format_version_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, context_->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
if (getContext()->getSettingsRef().fsync_metadata)
buf->sync();
break;
}
}
}
else
{
auto buf = version_file.second->readFile(version_file.first);
UInt32 read_format_version;
readIntText(read_format_version, *buf);
format_version = read_format_version;
if (!buf->eof())
throw Exception("Bad version file: " + fullPath(version_file.second, version_file.first), ErrorCodes::CORRUPTED_DATA);
format_version = *read_format_version;
}
if (format_version < min_format_version)

View File

@ -314,18 +314,32 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
/// TTL delete is preferred to recompression
TTLDeleteMergeSelector delete_ttl_selector(
TTLDeleteMergeSelector drop_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts);
true);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
/// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space
parts_to_merge = drop_ttl_selector.select(parts_ranges, data_settings->max_bytes_to_merge_at_max_space_in_pool);
if (!parts_to_merge.empty())
{
future_part->merge_type = MergeType::TTLDelete;
}
else if (metadata_snapshot->hasAnyRecompressionTTL())
else if (!data_settings->ttl_only_drop_parts)
{
TTLDeleteMergeSelector delete_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
false);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
future_part->merge_type = MergeType::TTLDelete;
}
if (parts_to_merge.empty() && metadata_snapshot->hasAnyRecompressionTTL())
{
TTLRecompressMergeSelector recompress_ttl_selector(
next_recompress_ttl_merge_times_by_partition,
@ -621,8 +635,16 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
{
size_t res = 0;
time_t current_time = std::time(nullptr);
for (const MergeTreeData::DataPartPtr & part : source_parts)
{
/// Exclude expired parts
time_t part_max_ttl = part->ttl_infos.part_max_ttl;
if (part_max_ttl && part_max_ttl <= current_time)
continue;
res += part->getBytesOnDisk();
}
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}

View File

@ -25,6 +25,10 @@ StorageSystemDisks::StorageSystemDisks(const StorageID & table_id_)
{"keep_free_space", std::make_shared<DataTypeUInt64>()},
{"type", std::make_shared<DataTypeString>()},
{"is_encrypted", std::make_shared<DataTypeUInt8>()},
{"is_read_only", std::make_shared<DataTypeUInt8>()},
{"is_write_once", std::make_shared<DataTypeUInt8>()},
{"is_remote", std::make_shared<DataTypeUInt8>()},
{"is_broken", std::make_shared<DataTypeUInt8>()},
{"cache_path", std::make_shared<DataTypeString>()},
}));
setInMemoryMetadata(storage_metadata);
@ -49,6 +53,10 @@ Pipe StorageSystemDisks::read(
MutableColumnPtr col_keep = ColumnUInt64::create();
MutableColumnPtr col_type = ColumnString::create();
MutableColumnPtr col_is_encrypted = ColumnUInt8::create();
MutableColumnPtr col_is_read_only = ColumnUInt8::create();
MutableColumnPtr col_is_write_once = ColumnUInt8::create();
MutableColumnPtr col_is_remote = ColumnUInt8::create();
MutableColumnPtr col_is_broken = ColumnUInt8::create();
MutableColumnPtr col_cache_path = ColumnString::create();
for (const auto & [disk_name, disk_ptr] : context->getDisksMap())
@ -62,6 +70,10 @@ Pipe StorageSystemDisks::read(
auto data_source_description = disk_ptr->getDataSourceDescription();
col_type->insert(toString(data_source_description.type));
col_is_encrypted->insert(data_source_description.is_encrypted);
col_is_read_only->insert(disk_ptr->isReadOnly());
col_is_write_once->insert(disk_ptr->isWriteOnce());
col_is_remote->insert(disk_ptr->isRemote());
col_is_broken->insert(disk_ptr->isBroken());
String cache_path;
if (disk_ptr->supportsCache())
@ -79,6 +91,10 @@ Pipe StorageSystemDisks::read(
res_columns.emplace_back(std::move(col_keep));
res_columns.emplace_back(std::move(col_type));
res_columns.emplace_back(std::move(col_is_encrypted));
res_columns.emplace_back(std::move(col_is_read_only));
res_columns.emplace_back(std::move(col_is_write_once));
res_columns.emplace_back(std::move(col_is_remote));
res_columns.emplace_back(std::move(col_is_broken));
res_columns.emplace_back(std::move(col_cache_path));
UInt64 num_rows = res_columns.at(0)->size();

View File

@ -86,6 +86,16 @@ private:
struct TableFunctionProperties
{
Documentation documentation;
/** It is determined by the possibility of modifying any data or making requests to arbitrary hostnames.
*
* If users can make a request to an arbitrary hostname, they can get the info from the internal network
* or manipulate internal APIs (say - put some data into Memcached, which is available only in the corporate network).
* This is named "SSRF attack".
* Or a user can use an open ClickHouse server to amplify DoS attacks.
*
* In those cases, the table function should not be allowed in readonly mode.
*/
bool allow_readonly = false;
};

View File

@ -1,13 +0,0 @@
FROM public.ecr.aws/lambda/python:3.9
# Install the function's dependencies using file requirements.txt
# from your project folder.
COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from base64 import b64decode
from collections import namedtuple
from typing import Any, Dict, List
from threading import Thread
@ -19,26 +20,25 @@ NEED_RERUN_OR_CANCELL_WORKFLOWS = {
"BackportPR",
}
# https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run
#
API_URL = os.getenv("API_URL", "https://api.github.com/repos/ClickHouse/ClickHouse")
MAX_RETRY = 5
DEBUG_INFO = {} # type: Dict[str, Any]
class Worker(Thread):
def __init__(self, request_queue: Queue, ignore_exception: bool = False):
def __init__(
self, request_queue: Queue, token: str, ignore_exception: bool = False
):
Thread.__init__(self)
self.queue = request_queue
self.token = token
self.ignore_exception = ignore_exception
self.response = {} # type: Dict
def run(self):
m = self.queue.get()
try:
self.response = _exec_get_with_retry(m)
self.response = _exec_get_with_retry(m, self.token)
except Exception as e:
if not self.ignore_exception:
raise
@ -98,10 +98,11 @@ def get_token_from_aws():
return get_access_token(encoded_jwt, installation_id)
def _exec_get_with_retry(url):
def _exec_get_with_retry(url: str, token: str) -> dict:
headers = {"Authorization": f"token {token}"}
for i in range(MAX_RETRY):
try:
response = requests.get(url)
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except Exception as ex:
@ -113,23 +114,25 @@ def _exec_get_with_retry(url):
WorkflowDescription = namedtuple(
"WorkflowDescription",
["run_id", "head_sha", "status", "rerun_url", "cancel_url", "conclusion"],
["url", "run_id", "head_sha", "status", "rerun_url", "cancel_url", "conclusion"],
)
def get_workflows_description_for_pull_request(
pull_request_event,
token,
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
print("PR", pull_request_event["number"], "has head ref", head_branch)
workflows_data = []
request_url = f"{API_URL}/actions/runs?per_page=100"
repo_url = pull_request_event["base"]["repo"]["url"]
request_url = f"{repo_url}/actions/runs?per_page=100"
# Get all workflows for the current branch
for i in range(1, 11):
workflows = _exec_get_with_retry(
f"{request_url}&event=pull_request&branch={head_branch}&page={i}"
f"{request_url}&event=pull_request&branch={head_branch}&page={i}", token
)
if not workflows["workflow_runs"]:
break
@ -164,6 +167,7 @@ def get_workflows_description_for_pull_request(
):
workflow_descriptions.append(
WorkflowDescription(
url=workflow["url"],
run_id=workflow["id"],
head_sha=workflow["head_sha"],
status=workflow["status"],
@ -176,19 +180,22 @@ def get_workflows_description_for_pull_request(
return workflow_descriptions
def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescription]:
def get_workflow_description_fallback(
pull_request_event, token
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
print("Get last 500 workflows from API to search related there")
# Fallback for a case of an already deleted branch and no workflows received
request_url = f"{API_URL}/actions/runs?per_page=100"
repo_url = pull_request_event["base"]["repo"]["url"]
request_url = f"{repo_url}/actions/runs?per_page=100"
q = Queue() # type: Queue
workers = []
workflows_data = []
i = 1
for i in range(1, 6):
q.put(f"{request_url}&page={i}")
worker = Worker(q, True)
worker = Worker(q, token, True)
worker.start()
workers.append(worker)
@ -220,6 +227,7 @@ def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescri
workflow_descriptions = [
WorkflowDescription(
url=wf["url"],
run_id=wf["id"],
head_sha=wf["head_sha"],
status=wf["status"],
@ -233,9 +241,10 @@ def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescri
return workflow_descriptions
def get_workflow_description(workflow_id) -> WorkflowDescription:
workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}")
def get_workflow_description(workflow_url, token) -> WorkflowDescription:
workflow = _exec_get_with_retry(workflow_url, token)
return WorkflowDescription(
url=workflow["url"],
run_id=workflow["id"],
head_sha=workflow["head_sha"],
status=workflow["status"],
@ -268,8 +277,11 @@ def exec_workflow_url(urls_to_cancel, token):
def main(event):
token = get_token_from_aws()
DEBUG_INFO["event_body"] = event["body"]
event_data = json.loads(event["body"])
DEBUG_INFO["event"] = event
if event["isBase64Encoded"]:
event_data = json.loads(b64decode(event["body"]))
else:
event_data = json.loads(event["body"])
print("Got event for PR", event_data["number"])
action = event_data["action"]
@ -279,9 +291,12 @@ def main(event):
print("PR has labels", labels)
if action == "closed" or "do not test" in labels:
print("PR merged/closed or manually labeled 'do not test' will kill workflows")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions or get_workflow_description_fallback(pull_request)
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
urls_to_cancel = []
for workflow_description in workflow_descriptions:
@ -294,9 +309,12 @@ def main(event):
exec_workflow_url(urls_to_cancel, token)
elif action == "synchronize":
print("PR is synchronized, going to stop old actions")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions or get_workflow_description_fallback(pull_request)
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
urls_to_cancel = []
for workflow_description in workflow_descriptions:
@ -308,11 +326,14 @@ def main(event):
urls_to_cancel.append(workflow_description.cancel_url)
print(f"Found {len(urls_to_cancel)} workflows to cancel")
exec_workflow_url(urls_to_cancel, token)
elif action == "labeled" and "can be tested" in labels:
elif action == "labeled" and event_data["label"]["name"] == "can be tested":
print("PR marked with can be tested label, rerun workflow")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions or get_workflow_description_fallback(pull_request)
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
if not workflow_descriptions:
print("Not found any workflows")
@ -330,7 +351,10 @@ def main(event):
print("Cancelled")
for _ in range(45):
latest_workflow_desc = get_workflow_description(most_recent_workflow.run_id)
# If the number of retries is changed: tune the lambda limits accordingly
latest_workflow_desc = get_workflow_description(
most_recent_workflow.url, token
)
print("Checking latest workflow", latest_workflow_desc)
if latest_workflow_desc.status in ("completed", "cancelled"):
print("Finally latest workflow done, going to rerun")
@ -347,6 +371,12 @@ def main(event):
def handler(event, _):
try:
main(event)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": '{"status": "OK"}',
}
finally:
for name, value in DEBUG_INFO.items():
print(f"Value of {name}: ", value)

View File

@ -0,0 +1 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -491,6 +491,12 @@ def main(event):
def handler(event, _):
try:
main(event)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": '{"status": "OK"}',
}
except Exception:
print("Received event: ", event)
raise

View File

@ -3332,7 +3332,7 @@ class ClickHouseInstance:
except Exception as e:
logging.warning(f"Stop ClickHouse raised an error {e}")
def start_clickhouse(self, start_wait_sec=60):
def start_clickhouse(self, start_wait_sec=60, retry_start=True):
if not self.stay_alive:
raise Exception(
"ClickHouse can be started again only with stay_alive=True instance"
@ -3364,6 +3364,8 @@ class ClickHouseInstance:
self.exec_in_container(
["bash", "-c", f"kill -9 {pid}"], user="root", nothrow=True
)
if not retry_start:
raise
time.sleep(time_to_sleep)
raise Exception("Cannot start ClickHouse, see additional info in logs")

View File

@ -39,3 +39,15 @@ def wait_until_quorum_lost(cluster, node, port=9181):
def wait_nodes(cluster, nodes):
for node in nodes:
wait_until_connected(cluster, node)
def is_leader(cluster, node, port=9181):
stat = send_4lw_cmd(cluster, node, "stat", port)
return "Mode: leader" in stat
def get_leader(cluster, nodes):
for node in nodes:
if is_leader(cluster, node):
return node
raise Exception("No leader in Keeper cluster.")

View File

@ -30,9 +30,7 @@ def start_cluster():
pytest.param("wide", "backup_wide", "s3_backup_wide", int(0), id="wide"),
],
)
def test_attach_compact_part(
table_name, backup_name, storage_policy, min_bytes_for_wide_part
):
def test_attach_part(table_name, backup_name, storage_policy, min_bytes_for_wide_part):
node.query(
f"""
-- Catch any errors (NOTE: warnings are ok)
@ -61,9 +59,6 @@ def test_attach_compact_part(
node.query(
f"""
-- NOTE: be aware not to DROP the table, but DETACH first to keep it in S3.
detach table ordinary_db.{table_name};
-- NOTE: DROP DATABASE cannot be done w/o this due to metadata leftovers
set force_remove_data_recursively_on_drop=1;
drop database ordinary_db sync;

View File

@ -13,7 +13,10 @@ def cluster():
"node1", main_configs=["configs/storage_conf.xml"], with_nginx=True
)
cluster.add_instance(
"node2", main_configs=["configs/storage_conf_web.xml"], with_nginx=True
"node2",
main_configs=["configs/storage_conf_web.xml"],
with_nginx=True,
stay_alive=True,
)
cluster.add_instance(
"node3", main_configs=["configs/storage_conf_web.xml"], with_nginx=True
@ -192,3 +195,53 @@ def test_cache(cluster, node_name):
node2.query("DROP TABLE test{} SYNC".format(i))
print(f"Ok {i}")
def test_unavailable_server(cluster):
"""
Regression test for the case when clickhouse-server simply ignore when
server is unavailable on start and later will simply return 0 rows for
SELECT from table on web disk.
"""
node2 = cluster.instances["node2"]
global uuids
node2.query(
"""
ATTACH TABLE test0 UUID '{}'
(id Int32) ENGINE = MergeTree() ORDER BY id
SETTINGS storage_policy = 'web';
""".format(
uuids[0]
)
)
node2.stop_clickhouse()
try:
# NOTE: you cannot use separate disk instead, since MergeTree engine will
# try to lookup parts on all disks (to look unexpected disks with parts)
# and fail because of unavailable server.
node2.exec_in_container(
[
"bash",
"-c",
"sed -i 's#http://nginx:80/test1/#http://nginx:8080/test1/#' /etc/clickhouse-server/config.d/storage_conf_web.xml",
]
)
with pytest.raises(Exception):
# HTTP retries with backup can take awhile
node2.start_clickhouse(start_wait_sec=120, retry_start=False)
assert node2.contains_in_log(
"Caught exception while loading metadata.*Connection refused"
)
assert node2.contains_in_log(
"HTTP request to \`http://nginx:8080/test1/.*\` failed at try 1/10 with bytes read: 0/unknown. Error: Connection refused."
)
finally:
node2.exec_in_container(
[
"bash",
"-c",
"sed -i 's#http://nginx:8080/test1/#http://nginx:80/test1/#' /etc/clickhouse-server/config.d/storage_conf_web.xml",
]
)
node2.start_clickhouse()
node2.query("DROP TABLE test0 SYNC")

View File

@ -12,7 +12,7 @@
</disk_memory>
<disk_hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/data/</endpoint>
<endpoint>hdfs://hdfs1:9000/</endpoint>
</disk_hdfs>
<disk_encrypted>
<type>encrypted</type>

View File

@ -22,6 +22,7 @@ def cluster():
with_hdfs=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()

View File

@ -33,7 +33,7 @@
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>false</can_become_leader>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>

View File

@ -33,7 +33,7 @@
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>false</can_become_leader>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>

View File

@ -33,7 +33,7 @@
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>false</can_become_leader>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>

View File

@ -148,10 +148,11 @@ def test_cmd_mntr(started_cluster):
wait_nodes()
clear_znodes()
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
# reset stat first
reset_node_stats(node1)
reset_node_stats(leader)
zk = get_fake_zk(node1.name, timeout=30.0)
zk = get_fake_zk(leader.name, timeout=30.0)
do_some_action(
zk,
create_cnt=10,
@ -162,7 +163,7 @@ def test_cmd_mntr(started_cluster):
delete_cnt=2,
)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="mntr")
data = keeper_utils.send_4lw_cmd(cluster, leader, cmd="mntr")
# print(data.decode())
reader = csv.reader(data.split("\n"), delimiter="\t")
@ -307,12 +308,13 @@ def test_cmd_srvr(started_cluster):
wait_nodes()
clear_znodes()
reset_node_stats(node1)
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
reset_node_stats(leader)
zk = get_fake_zk(node1.name, timeout=30.0)
zk = get_fake_zk(leader.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="srvr")
data = keeper_utils.send_4lw_cmd(cluster, leader, cmd="srvr")
print("srvr output -------------------------------------")
print(data)
@ -329,7 +331,7 @@ def test_cmd_srvr(started_cluster):
assert result["Received"] == "10"
assert result["Sent"] == "10"
assert int(result["Connections"]) == 1
assert int(result["Zxid"]) > 14
assert int(result["Zxid"]) > 10
assert result["Mode"] == "leader"
assert result["Node count"] == "13"
@ -342,13 +344,15 @@ def test_cmd_stat(started_cluster):
try:
wait_nodes()
clear_znodes()
reset_node_stats(node1)
reset_conn_stats(node1)
zk = get_fake_zk(node1.name, timeout=30.0)
leader = keeper_utils.get_leader(cluster, [node1, node2, node3])
reset_node_stats(leader)
reset_conn_stats(leader)
zk = get_fake_zk(leader.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="stat")
data = keeper_utils.send_4lw_cmd(cluster, leader, cmd="stat")
print("stat output -------------------------------------")
print(data)
@ -604,6 +608,10 @@ def test_cmd_csnp(started_cluster):
wait_nodes()
zk = get_fake_zk(node1.name, timeout=30.0)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="csnp")
print("csnp output -------------------------------------")
print(data)
try:
int(data)
assert True
@ -623,7 +631,10 @@ def test_cmd_lgif(started_cluster):
do_some_action(zk, create_cnt=100)
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="lgif")
print("lgif output -------------------------------------")
print(data)
reader = csv.reader(data.split("\n"), delimiter="\t")
result = {}
@ -641,3 +652,28 @@ def test_cmd_lgif(started_cluster):
assert int(result["last_snapshot_idx"]) >= 1
finally:
destroy_zk_client(zk)
def test_cmd_rqld(started_cluster):
wait_nodes()
# node2 can not be leader
for node in [node1, node3]:
data = keeper_utils.send_4lw_cmd(cluster, node, cmd="rqld")
assert data == "Sent leadership request to leader."
print("rqld output -------------------------------------")
print(data)
if not keeper_utils.is_leader(cluster, node):
# pull wait to become leader
retry = 0
# TODO not a restrict way
while not keeper_utils.is_leader(cluster, node) and retry < 30:
time.sleep(1)
retry += 1
if retry == 30:
print(
node.name
+ " does not become leader after 30s, maybe there is something wrong."
)
assert keeper_utils.is_leader(cluster, node)

View File

@ -4,6 +4,8 @@
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs>
</disks>
</storage_configuration>

View File

@ -4,6 +4,8 @@
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs>
<hdd>
<type>local</type>

View File

@ -4,14 +4,20 @@
<hdfs1>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse1/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs1>
<hdfs1_again>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse1/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs1_again>
<hdfs2>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse2/</endpoint>
<!-- FIXME: chicken and egg problem with current cluster.py -->
<skip_access_check>true</skip_access_check>
</hdfs2>
</disks>
<policies>

View File

@ -0,0 +1,18 @@
<clickhouse>
<storage_configuration>
<disks>
<disk2>
<path>/var/lib/clickhouse2/</path>
</disk2>
</disks>
<policies>
<test_policy>
<volumes>
<volume2>
<disk>disk2</disk>
</volume2>
</volumes>
</test_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,24 @@
<clickhouse>
<storage_configuration>
<disks>
<disk1>
<path>/var/lib/clickhouse1/</path>
</disk1>
<disk2>
<path>/var/lib/clickhouse2/</path>
</disk2>
</disks>
<policies>
<test_policy>
<volumes>
<volume1>
<disk>disk1</disk>
</volume1>
<volume2>
<disk>disk2</disk>
</volume2>
</volumes>
</test_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,40 @@
import os
import pytest
from helpers.test_tools import TSV
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_DIR = os.path.join(SCRIPT_DIR, "configs")
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/disks.xml"], stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_storage_policy_configuration_change(started_cluster):
node.query(
"CREATE TABLE a (x UInt64) ENGINE = MergeTree ORDER BY x SETTINGS storage_policy = 'test_policy'"
)
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "disk2_only.xml"),
"/etc/clickhouse-server/config.d/disks.xml",
)
node.start_clickhouse()
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "disks.xml"),
"/etc/clickhouse-server/config.d/disks.xml",
)
node.start_clickhouse()

View File

@ -21,5 +21,5 @@ SYSTEM FLUSH LOGS;
SELECT read_rows < 110000 FROM system.query_log
WHERE type = 'QueryFinish' AND current_database = currentDatabase()
AND event_time > now() - INTERVAL 10 SECOND
AND event_date >= yesterday()
AND lower(query) LIKE lower('SELECT s FROM order_by_desc ORDER BY u%');

View File

@ -190,6 +190,10 @@ CREATE TABLE system.disks
`keep_free_space` UInt64,
`type` String,
`is_encrypted` UInt8,
`is_read_only` UInt8,
`is_write_once` UInt8,
`is_remote` UInt8,
`is_broken` UInt8,
`cache_path` String
)
ENGINE = SystemDisks

View File

@ -0,0 +1,9 @@
42 hello 2022-08-04 18:30:53 {'2b95a497-3a5d-49af-bf85-15763318cde7':[1.2,3.4]}
UInt64 String DateTime Map(UUID, Array(Float32))
42 [1,2,3] {'abc':22,'def':33} [[4,5,6],[7],[8,9]] {10:[11,12],13:[14,15]} {'ghj':{'klm':[16,17]},'nop':{'rst':[18]}}
Int64 Array(UInt8) Map(String, UInt8) Array(Array(UInt8)) Map(UInt8, Array(UInt8)) Map(String, Map(String, Array(UInt8)))
5
[[['a','b','c'],['d','e','f']],[['g','h','i'],['j','k','l']]] Array(Array(Array(String)))
(((1,'a','2b95a497-3a5d-49af-bf85-15763318cde7',3.14))) Tuple(Tuple(Tuple(Int32, String, UUID, Float32)))
[{1:(2,'2022-08-04 18:30:53','s'),3:(4,'2020-08-04 18:30:53','t')}] Array(Map(UInt64, Tuple(Int16, DateTime, String)))
{'a':[(1,{10:1,20:2}),(2,{30:3,40:4})],'b':[(3,{50:5,60:6}),(4,{70:7,80:8})]} Map(String, Array(Tuple(UInt8, Map(UInt32, Int64))))

View File

@ -0,0 +1,29 @@
SET param_num=42;
SET param_str='hello';
SET param_date='2022-08-04 18:30:53';
SET param_map={'2b95a497-3a5d-49af-bf85-15763318cde7': [1.2, 3.4]};
SELECT {num:UInt64}, {str:String}, {date:DateTime}, {map:Map(UUID, Array(Float32))};
SELECT toTypeName({num:UInt64}), toTypeName({str:String}), toTypeName({date:DateTime}), toTypeName({map:Map(UUID, Array(Float32))});
SET param_id=42;
SET param_arr=[1, 2, 3];
SET param_map_2={'abc': 22, 'def': 33};
SET param_mul_arr=[[4, 5, 6], [7], [8, 9]];
SET param_map_arr={10: [11, 12], 13: [14, 15]};
SET param_map_map_arr={'ghj': {'klm': [16, 17]}, 'nop': {'rst': [18]}};
SELECT {id: Int64}, {arr: Array(UInt8)}, {map_2: Map(String, UInt8)}, {mul_arr: Array(Array(UInt8))}, {map_arr: Map(UInt8, Array(UInt8))}, {map_map_arr: Map(String, Map(String, Array(UInt8)))};
SELECT toTypeName({id: Int64}), toTypeName({arr: Array(UInt8)}), toTypeName({map_2: Map(String, UInt8)}), toTypeName({mul_arr: Array(Array(UInt8))}), toTypeName({map_arr: Map(UInt8, Array(UInt8))}), toTypeName({map_map_arr: Map(String, Map(String, Array(UInt8)))});
SET param_tbl=numbers;
SET param_db=system;
SET param_col=number;
SELECT {col:Identifier} FROM {db:Identifier}.{tbl:Identifier} LIMIT 1 OFFSET 5;
SET param_arr_arr_arr=[[['a', 'b', 'c'], ['d', 'e', 'f']], [['g', 'h', 'i'], ['j', 'k', 'l']]];
SET param_tuple_tuple_tuple=(((1, 'a', '2b95a497-3a5d-49af-bf85-15763318cde7', 3.14)));
SET param_arr_map_tuple=[{1:(2, '2022-08-04 18:30:53', 's'), 3:(4, '2020-08-04 18:30:53', 't')}];
SET param_map_arr_tuple_map={'a':[(1,{10:1, 20:2}),(2, {30:3, 40:4})], 'b':[(3, {50:5, 60:6}),(4, {70:7, 80:8})]};
SELECT {arr_arr_arr: Array(Array(Array(String)))}, toTypeName({arr_arr_arr: Array(Array(Array(String)))});
SELECT {tuple_tuple_tuple: Tuple(Tuple(Tuple(Int32, String, UUID, Float32)))}, toTypeName({tuple_tuple_tuple: Tuple(Tuple(Tuple(Int32, String, UUID, Float32)))});
SELECT {arr_map_tuple: Array(Map(UInt64, Tuple(Int16, DateTime, String)))}, toTypeName({arr_map_tuple: Array(Map(UInt64, Tuple(Int16, DateTime, String)))});
SELECT {map_arr_tuple_map: Map(String, Array(Tuple(UInt8, Map(UInt32, Int64))))}, toTypeName({map_arr_tuple_map: Map(String, Array(Tuple(UInt8, Map(UInt32, Int64))))});

View File

@ -0,0 +1,2 @@
199998
199998

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS 02481_mergetree;
DROP TABLE IF EXISTS 02481_merge;
CREATE TABLE 02481_mergetree(x UInt64, y UInt64, arr Array(String)) ENGINE = MergeTree ORDER BY x SAMPLE BY x;
CREATE TABLE 02481_merge(x UInt64, y UInt64, arr Array(String)) ENGINE = Merge(currentDatabase(), '^(02481_mergetree)$');
INSERT INTO 02481_mergetree SELECT number, number + 1, [1,2] FROM system.numbers LIMIT 100000;
SELECT count() FROM 02481_mergetree SAMPLE 1 / 2 ARRAY JOIN arr WHERE x != 0;
SELECT count() FROM 02481_merge SAMPLE 1 / 2 ARRAY JOIN arr WHERE x != 0;
DROP TABLE 02481_mergetree;
DROP TABLE 02481_merge;

View File

@ -5,6 +5,7 @@ v22.9.4.32-stable 2022-10-26
v22.9.3.18-stable 2022-09-30
v22.9.2.7-stable 2022-09-23
v22.9.1.2603-stable 2022-09-22
v22.8.9.24-lts 2022-11-19
v22.8.8.3-lts 2022-10-27
v22.8.7.34-lts 2022-10-26
v22.8.6.71-lts 2022-09-30

1 v22.11.1.1360-stable 2022-11-17
5 v22.9.3.18-stable 2022-09-30
6 v22.9.2.7-stable 2022-09-23
7 v22.9.1.2603-stable 2022-09-22
8 v22.8.9.24-lts 2022-11-19
9 v22.8.8.3-lts 2022-10-27
10 v22.8.7.34-lts 2022-10-26
11 v22.8.6.71-lts 2022-09-30

View File

@ -1,80 +0,0 @@
#!/bin/bash
# This is a script to automate the SECURITY.md generation in the repository root.
# The logic is the following:
# We support the latest ClickHouse Y.M stable release,
# the two releases before the latest stable,
# and the two latest LTS releases (which may be already included by the criteria above).
# The LTS releases are every Y.3 and Y.8 stable release.
echo "
# Security Policy
## Security Announcements
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/).
## Scope and Supported Versions
The following versions of ClickHouse server are currently being supported with security updates:
"
clickhouse-local --query "
SELECT
y::String || '.' || (y < toYear(today()) - 2000 - 1 ? '*' : m::String) AS Version,
(n <= 3 OR (is_lts AND lts_n <= 2)) ? '✔️' : '❌' AS Supported
FROM
(
SELECT
y,
m,
count() OVER (ORDER BY y DESC, m DESC) AS n,
m IN (3, 8) AS is_lts,
countIf(is_lts) OVER (ORDER BY y DESC, m DESC) AS lts_n
FROM
(
WITH
extractGroups(version, 'v(\\d+)\\.(\\d+)') AS v,
v[1]::UInt8 AS y,
v[2]::UInt8 AS m
SELECT
y,
m
FROM file('$(dirname "${BASH_SOURCE[0]}")/../list-versions/version_date.tsv', TSV, 'version String, date String')
ORDER BY
y DESC,
m DESC
LIMIT 1 BY
y,
m
)
)
LIMIT 1 BY Version
FORMAT Markdown"
echo "
## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com). We do not offer any financial rewards for reporting issues to us using this method. Alternatively, you can also submit your findings through our public bug bounty program hosted by [Bugcrowd](https://bugcrowd.com/clickhouse) and be rewarded for it as per the program scope and rules of engagement.
### When Should I Report a Vulnerability?
- You think you discovered a potential security vulnerability in ClickHouse
- You are unsure how a vulnerability affects ClickHouse
### When Should I NOT Report a Vulnerability?
- You need help tuning ClickHouse components for security
- You need help applying security related updates
- Your issue is not security related
## Security Vulnerability Response
Each report is acknowledged and analyzed by ClickHouse maintainers within 5 working days.
As the security issue moves from triage, to identified fix, to release planning we will keep the reporter updated.
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.
"

View File

@ -0,0 +1,102 @@
#!/usr/bin/env python
from pathlib import Path
from typing import List
VERSIONS_FILE = (
Path(__file__).absolute().parent.parent / "list-versions" / "version_date.tsv"
)
HEADER = """<!--
the file is autogenerated by utils/security-generator/generate_security.py
-->
# Security Policy
## Security Announcements
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/).
## Scope and Supported Versions
The following versions of ClickHouse server are currently being supported with security updates:
"""
FOOTER = """## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com). We do not offer any financial rewards for reporting issues to us using this method. Alternatively, you can also submit your findings through our public bug bounty program hosted by [Bugcrowd](https://bugcrowd.com/clickhouse) and be rewarded for it as per the program scope and rules of engagement.
### When Should I Report a Vulnerability?
- You think you discovered a potential security vulnerability in ClickHouse
- You are unsure how a vulnerability affects ClickHouse
### When Should I NOT Report a Vulnerability?
- You need help tuning ClickHouse components for security
- You need help applying security related updates
- Your issue is not security related
## Security Vulnerability Response
Each report is acknowledged and analyzed by ClickHouse maintainers within 5 working days.
As the security issue moves from triage, to identified fix, to release planning we will keep the reporter updated.
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.
"""
def generate_supported_versions():
with open(VERSIONS_FILE, "r", encoding="utf-8") as fd:
versions = [line.split(maxsplit=1)[0][1:] for line in fd.readlines()]
# The versions in VERSIONS_FILE are ordered ascending, so the first one is
# the greatest one. We may have supported versions in the previous year
unsupported_year = int(versions[0].split(".", maxsplit=1)[0]) - 2
# 3 supported versions
supported = [] # type: List[str]
# 2 LTS versions, one of them could be in supported
lts = [] # type: List[str]
# The rest are unsupported
unsupported = [] # type: List[str]
table = [
"| Version | Supported |",
"|:-|:-|",
]
for version in versions:
year = int(version.split(".")[0])
month = int(version.split(".")[1])
version = f"{year}.{month}"
if version in supported or version in lts:
continue
if len(supported) < 3:
supported.append(version)
if len(lts) < 2 and month in [3, 8]:
# The version can be LTS as well
lts.append(version)
table.append(f"| {version} | ✔️ |")
continue
if len(lts) < 2 and month in [3, 8]:
lts.append(version)
table.append(f"| {version} | ✔️ |")
continue
if year <= unsupported_year:
# The whole year is unsopported
version = f"{year}.*"
if not version in unsupported:
unsupported.append(version)
table.append(f"| {version} | ❌ |")
return "\n".join(table) + "\n"
def main():
print(HEADER)
print(generate_supported_versions())
print(FOOTER)
if __name__ == "__main__":
main()