Compare commits

...

10 Commits

Author SHA1 Message Date
robot-ch-test-poll1
68aebf4c8a
Merge 675d524663 into ef046d2839 2024-11-21 00:09:46 +01:00
robot-clickhouse-ci-1
ef046d2839
Merge pull request #72167 from ClickHouse/backport/24.9/71761
Backport #71761 to 24.9: Forbid Dynamic/Variant types in min/max functions to avoid confusion
2024-11-20 18:15:05 +01:00
robot-ch-test-poll3
d93b2fbb0d
Merge pull request #72157 from ClickHouse/backport/24.9/71982
Backport #71982 to 24.9: Allow only SELECT queries in EXPLAIN AST used inside subquery
2024-11-20 17:10:24 +01:00
robot-clickhouse
4872a45a6e Backport #71761 to 24.9: Forbid Dynamic/Variant types in min/max functions to avoid confusion 2024-11-20 15:08:03 +00:00
robot-clickhouse
72515d7db6 Backport #71982 to 24.9: Allow only SELECT queries in EXPLAIN AST used inside subquery 2024-11-20 14:07:48 +00:00
robot-clickhouse-ci-1
0a205054b6
Merge pull request #72143 from ClickHouse/backport/24.9/71845
Backport #71845 to 24.9: Acquire zero-copy shared lock before moving a part
2024-11-20 14:18:22 +01:00
robot-clickhouse
607ebf50fb Backport #71845 to 24.9: Acquire zero-copy shared lock before moving a part 2024-11-20 11:07:55 +00:00
robot-clickhouse-ci-2
1709e66307
Merge pull request #72116 from ClickHouse/backport/24.9/72080
Backport #72080 to 24.9: Fix formatting of `MOVE PARTITION ... TO TABLE ...` alter commands
2024-11-20 04:40:01 +01:00
robot-clickhouse
0d1baa57a4 Backport #72080 to 24.9: Fix formatting of MOVE PARTITION ... TO TABLE ... alter commands 2024-11-20 01:36:18 +00:00
robot-clickhouse
675d524663 Backport #70915 to 24.9: Cache HEAD API requests to object storage in the plain_rewritable disk 2024-10-31 04:53:25 +00:00
35 changed files with 588 additions and 131 deletions

View File

@ -79,6 +79,14 @@ public:
"Illegal type {} of second argument of aggregate function {} because the values of that data type are not comparable",
type_val->getName(),
getName());
if (isDynamic(this->type_val) || isVariant(this->type_val))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of aggregate function {} because the column of that type can contain values with different "
"data types. Consider using typed subcolumns or cast column to a specific data type",
this->type_val->getName(),
getName());
}
void create(AggregateDataPtr __restrict place) const override /// NOLINT

View File

@ -35,6 +35,14 @@ public:
"Illegal type {} of argument of aggregate function {} because the values of that data type are not comparable",
this->result_type->getName(),
getName());
if (isDynamic(this->result_type) || isVariant(this->result_type))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of aggregate function {} because the column of that type can contain values with different "
"data types. Consider using typed subcolumns or cast column to a specific data type",
this->result_type->getName(),
getName());
}
String getName() const override

View File

@ -63,6 +63,14 @@ public:
"Illegal type {} for combinator {} because the values of that data type are not comparable",
arguments[key_col]->getName(),
getName());
if (isDynamic(arguments[key_col]) || isVariant(arguments[key_col]))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of aggregate function {} because the column of that type can contain values with different "
"data types. Consider using typed subcolumns or cast column to a specific data type",
arguments[key_col]->getName(),
getName());
}
String getName() const override

View File

@ -65,6 +65,9 @@ static struct InitFiu
PAUSEABLE(infinite_sleep) \
PAUSEABLE(stop_moving_part_before_swap_with_active) \
REGULAR(slowdown_index_analysis) \
REGULAR(replicated_merge_tree_all_replicas_stale) \
REGULAR(zero_copy_lock_zk_fail_before_op) \
REGULAR(zero_copy_lock_zk_fail_after_op) \
namespace FailPoints

View File

@ -1,5 +1,5 @@
#include <Disks/ObjectStorages/CommonPathPrefixKeyGenerator.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Common/SharedLockGuard.h>
#include <Common/getRandomASCIIString.h>
@ -11,7 +11,7 @@
namespace DB
{
CommonPathPrefixKeyGenerator::CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_)
CommonPathPrefixKeyGenerator::CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_)
: storage_key_prefix(key_prefix_), path_map(std::move(path_map_))
{
}
@ -59,7 +59,7 @@ std::tuple<std::string, std::vector<std::string>> CommonPathPrefixKeyGenerator::
if (it != ptr->map.end())
{
std::vector<std::string> vec(std::make_move_iterator(dq.begin()), std::make_move_iterator(dq.end()));
return std::make_tuple(it->second, std::move(vec));
return std::make_tuple(it->second.path, std::move(vec));
}
if (!p.filename().empty())

View File

@ -20,13 +20,13 @@ namespace DB
/// The key generator ensures that the original directory hierarchy is
/// preserved, which is required for the MergeTree family.
struct InMemoryPathMap;
struct InMemoryDirectoryPathMap;
class CommonPathPrefixKeyGenerator : public IObjectStorageKeysGenerator
{
public:
/// Local to remote path map. Leverages filesystem::path comparator for paths.
explicit CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_);
explicit CommonPathPrefixKeyGenerator(String key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_);
ObjectStorageKey generate(const String & path, bool is_directory, const std::optional<String> & key_prefix) const override;
@ -36,7 +36,7 @@ private:
const String storage_key_prefix;
std::weak_ptr<InMemoryPathMap> path_map;
std::weak_ptr<InMemoryDirectoryPathMap> path_map;
};
}

View File

@ -1,5 +1,5 @@
#include "FlatDirectoryStructureKeyGenerator.h"
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include "Common/ObjectStorageKey.h"
#include <Common/SharedLockGuard.h>
#include <Common/SharedMutex.h>
@ -12,7 +12,8 @@
namespace DB
{
FlatDirectoryStructureKeyGenerator::FlatDirectoryStructureKeyGenerator(String storage_key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_)
FlatDirectoryStructureKeyGenerator::FlatDirectoryStructureKeyGenerator(
String storage_key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_)
: storage_key_prefix(storage_key_prefix_), path_map(std::move(path_map_))
{
}
@ -31,11 +32,11 @@ ObjectStorageKey FlatDirectoryStructureKeyGenerator::generate(const String & pat
SharedLockGuard lock(ptr->mutex);
auto it = ptr->map.find(p);
if (it != ptr->map.end())
return ObjectStorageKey::createAsRelative(key_prefix.has_value() ? *key_prefix : storage_key_prefix, it->second);
return ObjectStorageKey::createAsRelative(key_prefix.has_value() ? *key_prefix : storage_key_prefix, it->second.path);
it = ptr->map.find(directory);
if (it != ptr->map.end())
remote_path = it->second;
remote_path = it->second.path;
}
constexpr size_t part_size = 32;
std::filesystem::path key = remote_path.has_value() ? *remote_path

View File

@ -6,18 +6,18 @@
namespace DB
{
struct InMemoryPathMap;
struct InMemoryDirectoryPathMap;
class FlatDirectoryStructureKeyGenerator : public IObjectStorageKeysGenerator
{
public:
explicit FlatDirectoryStructureKeyGenerator(String storage_key_prefix_, std::weak_ptr<InMemoryPathMap> path_map_);
explicit FlatDirectoryStructureKeyGenerator(String storage_key_prefix_, std::weak_ptr<InMemoryDirectoryPathMap> path_map_);
ObjectStorageKey generate(const String & path, bool is_directory, const std::optional<String> & key_prefix) const override;
private:
const String storage_key_prefix;
std::weak_ptr<InMemoryPathMap> path_map;
std::weak_ptr<InMemoryDirectoryPathMap> path_map;
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <memory>
#include <optional>
#include <vector>
#include <unordered_map>
#include <Poco/Timestamp.h>
@ -190,8 +191,22 @@ public:
virtual uint64_t getFileSize(const std::string & path) const = 0;
virtual std::optional<uint64_t> getFileSizeIfExists(const std::string & path) const
{
if (isFile(path))
return getFileSize(path);
return std::nullopt;
}
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
virtual std::optional<Poco::Timestamp> getLastModifiedIfExists(const std::string & path) const
{
if (exists(path))
return getLastModified(path);
return std::nullopt;
}
virtual time_t getLastChanged(const std::string & /* path */) const
{
throwNotImplemented();

View File

@ -2,14 +2,17 @@
#include <filesystem>
#include <map>
#include <optional>
#include <shared_mutex>
#include <base/defines.h>
#include <Common/SharedLockGuard.h>
#include <Common/SharedMutex.h>
namespace DB
{
struct InMemoryPathMap
struct InMemoryDirectoryPathMap
{
struct PathComparator
{
@ -22,8 +25,27 @@ struct InMemoryPathMap
return path1 < path2;
}
};
/// Local -> Remote path.
using Map = std::map<std::filesystem::path, std::string, PathComparator>;
struct RemotePathInfo
{
std::string path;
time_t last_modified = 0;
};
using Map = std::map<std::filesystem::path, RemotePathInfo, PathComparator>;
std::optional<RemotePathInfo> getRemotePathInfoIfExists(const std::string & path)
{
auto base_path = path;
if (base_path.ends_with('/'))
base_path.pop_back();
SharedLockGuard lock(mutex);
auto it = map.find(base_path);
if (it == map.end())
return std::nullopt;
return it->second;
}
mutable SharedMutex mutex;
#ifdef OS_LINUX

View File

@ -116,7 +116,8 @@ void registerPlainMetadataStorage(MetadataStorageFactory & factory)
ObjectStoragePtr object_storage) -> MetadataStoragePtr
{
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
return std::make_shared<MetadataStorageFromPlainObjectStorage>(object_storage, key_compatibility_prefix);
return std::make_shared<MetadataStorageFromPlainObjectStorage>(
object_storage, key_compatibility_prefix, config.getUInt64(config_prefix + ".object_metadata_cache_size", 0));
});
}
@ -130,7 +131,8 @@ void registerPlainRewritableMetadataStorage(MetadataStorageFactory & factory)
ObjectStoragePtr object_storage) -> MetadataStoragePtr
{
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
return std::make_shared<MetadataStorageFromPlainRewritableObjectStorage>(object_storage, key_compatibility_prefix);
return std::make_shared<MetadataStorageFromPlainRewritableObjectStorage>(
object_storage, key_compatibility_prefix, config.getUInt64(config_prefix + ".object_metadata_cache_size", 0));
});
}

View File

@ -1,18 +1,32 @@
#include "MetadataStorageFromPlainObjectStorage.h"
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorageOperations.h>
#include <Disks/ObjectStorages/StaticDirectoryIterator.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Common/ObjectStorageKey.h>
#include <Common/SipHash.h>
#include <Common/filesystemHelpers.h>
#include <filesystem>
#include <memory>
#include <optional>
#include <tuple>
#include <unordered_set>
#include <Poco/Timestamp.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
}
namespace
{
@ -23,10 +37,12 @@ std::filesystem::path normalizeDirectoryPath(const std::filesystem::path & path)
}
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_)
: object_storage(object_storage_)
, storage_path_prefix(std::move(storage_path_prefix_))
MetadataStorageFromPlainObjectStorage::MetadataStorageFromPlainObjectStorage(
ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size)
: object_storage(object_storage_), storage_path_prefix(std::move(storage_path_prefix_))
{
if (object_metadata_cache_size)
object_metadata_cache.emplace(object_metadata_cache_size);
}
MetadataTransactionPtr MetadataStorageFromPlainObjectStorage::createTransaction()
@ -63,13 +79,35 @@ bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path
uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path) const
{
auto object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
auto metadata = object_storage->tryGetObjectMetadata(object_key.serialize());
if (metadata)
return metadata->size_bytes;
if (auto res = getFileSizeIfExists(path))
return *res;
/// Throws a FILE_DOESNT_EXIST exception in newer releases.
return 0;
}
std::optional<uint64_t> MetadataStorageFromPlainObjectStorage::getFileSizeIfExists(const String & path) const
{
if (auto res = getObjectMetadataEntryWithCache(path))
return res->file_size;
return std::nullopt;
}
Poco::Timestamp MetadataStorageFromPlainObjectStorage::getLastModified(const std::string & path) const
{
if (auto res = getLastModifiedIfExists(path))
return *res;
else
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File or directory {} does not exist on {}", path, object_storage->getName());
}
std::optional<Poco::Timestamp> MetadataStorageFromPlainObjectStorage::getLastModifiedIfExists(const std::string & path) const
{
/// Since the plain object storage is used for backups only, return the current time.
if (exists(path))
return Poco::Timestamp{};
return std::nullopt;
}
std::vector<std::string> MetadataStorageFromPlainObjectStorage::listDirectory(const std::string & path) const
{
auto key_prefix = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */).serialize();
@ -114,6 +152,31 @@ StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std
return {StoredObject(object_key.serialize(), path, object_size)};
}
MetadataStorageFromPlainObjectStorage::ObjectMetadataEntryPtr
MetadataStorageFromPlainObjectStorage::getObjectMetadataEntryWithCache(const std::string & path) const
{
auto object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
auto get = [&] -> ObjectMetadataEntryPtr
{
if (auto metadata = object_storage->tryGetObjectMetadata(object_key.serialize()))
return std::make_shared<ObjectMetadataEntry>(metadata->size_bytes, metadata->last_modified.epochTime());
return nullptr;
};
if (object_metadata_cache)
{
SipHash hash;
hash.update(object_key.serialize());
auto hash128 = hash.get128();
if (auto res = object_metadata_cache->get(hash128))
return res;
if (auto mapped = get())
return object_metadata_cache->getOrSet(hash128, [&] { return mapped; }).first;
return object_metadata_cache->get(hash128);
}
return get();
}
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const
{
return metadata_storage;
@ -178,8 +241,17 @@ void MetadataStorageFromPlainObjectStorageTransaction::addBlobToMetadata(
/// Noop, local metadata files is only one file, it is the metadata file itself.
}
UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromPlainObjectStorageTransaction::unlinkMetadata(const std::string &)
UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromPlainObjectStorageTransaction::unlinkMetadata(const std::string & path)
{
/// The record has become stale, remove it from cache.
if (metadata_storage.object_metadata_cache)
{
auto object_key = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
SipHash hash;
hash.update(object_key.serialize());
metadata_storage.object_metadata_cache->remove(hash.get128());
}
/// No hardlinks, so will always remove file.
return std::make_shared<UnlinkMetadataFileOperationOutcome>(UnlinkMetadataFileOperationOutcome{0});
}

View File

@ -1,19 +1,23 @@
#pragma once
#include <Core/Types.h>
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataOperationsHolder.h>
#include <Disks/ObjectStorages/MetadataStorageTransactionState.h>
#include <Common/CacheBase.h>
#include <map>
#include <memory>
#include <string>
#include <unordered_set>
#include <Poco/Timestamp.h>
namespace DB
{
struct InMemoryPathMap;
struct InMemoryDirectoryPathMap;
struct UnlinkMetadataFileOperationOutcome;
using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>;
@ -33,13 +37,22 @@ private:
friend class MetadataStorageFromPlainObjectStorageTransaction;
protected:
struct ObjectMetadataEntry
{
uint64_t file_size;
time_t last_modified;
};
using ObjectMetadataEntryPtr = std::shared_ptr<ObjectMetadataEntry>;
ObjectStoragePtr object_storage;
String storage_path_prefix;
const String storage_path_prefix;
mutable std::optional<CacheBase<UInt128, ObjectMetadataEntry>> object_metadata_cache;
mutable SharedMutex metadata_mutex;
public:
MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_);
MetadataStorageFromPlainObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size);
MetadataTransactionPtr createTransaction() override;
@ -54,6 +67,7 @@ public:
bool isDirectory(const std::string & path) const override;
uint64_t getFileSize(const String & path) const override;
std::optional<uint64_t> getFileSizeIfExists(const String & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override;
@ -63,11 +77,8 @@ public:
StoredObjects getStorageObjects(const std::string & path) const override;
Poco::Timestamp getLastModified(const std::string & /* path */) const override
{
/// Required by MergeTree
return {};
}
Poco::Timestamp getLastModified(const std::string & path) const override;
std::optional<Poco::Timestamp> getLastModifiedIfExists(const String & path) const override;
uint32_t getHardlinkCount(const std::string & /* path */) const override
{
@ -82,7 +93,9 @@ protected:
virtual std::string getMetadataKeyPrefix() const { return object_storage->getCommonKeyPrefix(); }
/// Returns a map of virtual filesystem paths to paths in the object storage.
virtual std::shared_ptr<InMemoryPathMap> getPathMap() const { throwNotImplemented(); }
virtual std::shared_ptr<InMemoryDirectoryPathMap> getPathMap() const { throwNotImplemented(); }
ObjectMetadataEntryPtr getObjectMetadataEntryWithCache(const std::string & path) const;
};
class MetadataStorageFromPlainObjectStorageTransaction final : public IMetadataTransaction, private MetadataOperationsHolder

View File

@ -1,8 +1,9 @@
#include "MetadataStorageFromPlainObjectStorageOperations.h"
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Poco/Timestamp.h>
#include <Common/Exception.h>
#include <Common/SharedLockGuard.h>
#include <Common/logger_useful.h>
@ -30,7 +31,10 @@ ObjectStorageKey createMetadataObjectKey(const std::string & object_key_prefix,
}
MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::MetadataStorageFromPlainObjectStorageCreateDirectoryOperation(
std::filesystem::path && path_, InMemoryPathMap & path_map_, ObjectStoragePtr object_storage_, const std::string & metadata_key_prefix_)
std::filesystem::path && path_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_)
: path(std::move(path_))
, path_map(path_map_)
, object_storage(object_storage_)
@ -71,7 +75,8 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::execute(std:
{
std::lock_guard lock(path_map.mutex);
auto & map = path_map.map;
[[maybe_unused]] auto result = map.emplace(base_path, object_key_prefix);
[[maybe_unused]] auto result
= map.emplace(base_path, InMemoryDirectoryPathMap::RemotePathInfo{object_key_prefix, Poco::Timestamp{}.epochTime()});
chassert(result.second);
}
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
@ -109,7 +114,7 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::undo(std::un
MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::MetadataStorageFromPlainObjectStorageMoveDirectoryOperation(
std::filesystem::path && path_from_,
std::filesystem::path && path_to_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_)
: path_from(std::move(path_from_))
@ -139,7 +144,7 @@ std::unique_ptr<WriteBufferFromFileBase> MetadataStorageFromPlainObjectStorageMo
throw Exception(
ErrorCodes::FILE_ALREADY_EXISTS, "Metadata object for the new (destination) path '{}' already exists", new_path);
remote_path = expected_it->second;
remote_path = expected_it->second.path;
}
auto metadata_object_key = createMetadataObjectKey(remote_path, metadata_key_prefix);
@ -190,6 +195,7 @@ void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::execute(std::u
auto & map = path_map.map;
[[maybe_unused]] auto result = map.emplace(base_path_to, map.extract(base_path_from).mapped());
chassert(result.second);
result.first->second.last_modified = Poco::Timestamp{}.epochTime();
}
write_finalized = true;
@ -213,7 +219,10 @@ void MetadataStorageFromPlainObjectStorageMoveDirectoryOperation::undo(std::uniq
}
MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation(
std::filesystem::path && path_, InMemoryPathMap & path_map_, ObjectStoragePtr object_storage_, const std::string & metadata_key_prefix_)
std::filesystem::path && path_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_)
: path(std::move(path_)), path_map(path_map_), object_storage(object_storage_), metadata_key_prefix(metadata_key_prefix_)
{
chassert(path.string().ends_with('/'));
@ -229,7 +238,7 @@ void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::execute(std:
auto path_it = map.find(base_path);
if (path_it == map.end())
return;
key_prefix = path_it->second;
key_prefix = path_it->second.path;
}
LOG_TRACE(getLogger("MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation"), "Removing directory '{}'", path);

View File

@ -1,7 +1,7 @@
#pragma once
#include <Disks/ObjectStorages/IMetadataOperation.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
#include <filesystem>
@ -14,7 +14,7 @@ class MetadataStorageFromPlainObjectStorageCreateDirectoryOperation final : publ
{
private:
std::filesystem::path path;
InMemoryPathMap & path_map;
InMemoryDirectoryPathMap & path_map;
ObjectStoragePtr object_storage;
const std::string metadata_key_prefix;
const std::string object_key_prefix;
@ -26,7 +26,7 @@ public:
MetadataStorageFromPlainObjectStorageCreateDirectoryOperation(
/// path_ must end with a trailing '/'.
std::filesystem::path && path_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_);
@ -39,7 +39,7 @@ class MetadataStorageFromPlainObjectStorageMoveDirectoryOperation final : public
private:
std::filesystem::path path_from;
std::filesystem::path path_to;
InMemoryPathMap & path_map;
InMemoryDirectoryPathMap & path_map;
ObjectStoragePtr object_storage;
const std::string metadata_key_prefix;
@ -54,7 +54,7 @@ public:
/// Both path_from_ and path_to_ must end with a trailing '/'.
std::filesystem::path && path_from_,
std::filesystem::path && path_to_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_);
@ -68,7 +68,7 @@ class MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation final : publ
private:
std::filesystem::path path;
InMemoryPathMap & path_map;
InMemoryDirectoryPathMap & path_map;
ObjectStoragePtr object_storage;
const std::string metadata_key_prefix;
@ -79,7 +79,7 @@ public:
MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation(
/// path_ must end with a trailing '/'.
std::filesystem::path && path_,
InMemoryPathMap & path_map_,
InMemoryDirectoryPathMap & path_map_,
ObjectStoragePtr object_storage_,
const std::string & metadata_key_prefix_);

View File

@ -1,15 +1,19 @@
#include <Disks/ObjectStorages/FlatDirectoryStructureKeyGenerator.h>
#include <Disks/ObjectStorages/InMemoryPathMap.h>
#include <Disks/ObjectStorages/InMemoryDirectoryPathMap.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainRewritableObjectStorage.h>
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
#include <cstddef>
#include <exception>
#include <optional>
#include <unordered_set>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/SharedThreadPools.h>
#include "Common/SharedLockGuard.h"
#include "Common/SharedMutex.h"
#include <Common/ErrorCodes.h>
#include <Poco/Timestamp.h>
#include <Common/Exception.h>
#include <Common/SharedLockGuard.h>
#include <Common/SharedMutex.h>
#include <Common/logger_useful.h>
#include "CommonPathPrefixKeyGenerator.h"
@ -41,10 +45,10 @@ std::string getMetadataKeyPrefix(ObjectStoragePtr object_storage)
: metadata_key_prefix;
}
std::shared_ptr<InMemoryPathMap> loadPathPrefixMap(const std::string & metadata_key_prefix, ObjectStoragePtr object_storage)
std::shared_ptr<InMemoryDirectoryPathMap> loadPathPrefixMap(const std::string & metadata_key_prefix, ObjectStoragePtr object_storage)
{
auto result = std::make_shared<InMemoryPathMap>();
using Map = InMemoryPathMap::Map;
auto result = std::make_shared<InMemoryDirectoryPathMap>();
using Map = InMemoryDirectoryPathMap::Map;
ThreadPool & pool = getIOThreadPool().get();
ThreadPoolCallbackRunnerLocal<void> runner(pool, "PlainRWMetaLoad");
@ -74,17 +78,24 @@ std::shared_ptr<InMemoryPathMap> loadPathPrefixMap(const std::string & metadata_
StoredObject object{path};
String local_path;
Poco::Timestamp last_modified{};
try
{
auto read_buf = object_storage->readObject(object, settings);
readStringUntilEOF(local_path, *read_buf);
auto object_metadata = object_storage->tryGetObjectMetadata(path);
/// It ok if a directory was removed just now.
/// We support attaching a filesystem that is concurrently modified by someone else.
if (!object_metadata)
return;
/// Assuming that local and the object storage clocks are synchronized.
last_modified = object_metadata->last_modified;
}
#if USE_AWS_S3
catch (const S3Exception & e)
{
/// It is ok if a directory was removed just now.
/// We support attaching a filesystem that is concurrently modified by someone else.
if (e.getS3ErrorCode() == Aws::S3::S3Errors::NO_SUCH_KEY)
return;
throw;
@ -102,18 +113,19 @@ std::shared_ptr<InMemoryPathMap> loadPathPrefixMap(const std::string & metadata_
std::pair<Map::iterator, bool> res;
{
std::lock_guard lock(result->mutex);
res = result->map.emplace(std::filesystem::path(local_path).parent_path(), remote_path.parent_path());
res = result->map.emplace(
std::filesystem::path(local_path).parent_path(),
InMemoryDirectoryPathMap::RemotePathInfo{remote_path.parent_path(), last_modified.epochTime()});
}
/// This can happen if table replication is enabled, then the same local path is written
/// in `prefix.path` of each replica.
/// TODO: should replicated tables (e.g., RMT) be explicitly disallowed?
if (!res.second)
LOG_WARNING(
log,
"The local path '{}' is already mapped to a remote path '{}', ignoring: '{}'",
local_path,
res.first->second,
res.first->second.path,
remote_path.parent_path().string());
});
}
@ -133,7 +145,7 @@ void getDirectChildrenOnDiskImpl(
const std::string & storage_key,
const RelativePathsWithMetadata & remote_paths,
const std::string & local_path,
const InMemoryPathMap & path_map,
const InMemoryDirectoryPathMap & path_map,
std::unordered_set<std::string> & result)
{
/// Directories are retrieved from the in-memory path map.
@ -181,8 +193,8 @@ void getDirectChildrenOnDiskImpl(
}
MetadataStorageFromPlainRewritableObjectStorage::MetadataStorageFromPlainRewritableObjectStorage(
ObjectStoragePtr object_storage_, String storage_path_prefix_)
: MetadataStorageFromPlainObjectStorage(object_storage_, storage_path_prefix_)
ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size)
: MetadataStorageFromPlainObjectStorage(object_storage_, storage_path_prefix_, object_metadata_cache_size)
, metadata_key_prefix(DB::getMetadataKeyPrefix(object_storage))
, path_map(loadPathPrefixMap(metadata_key_prefix, object_storage))
{
@ -213,27 +225,23 @@ MetadataStorageFromPlainRewritableObjectStorage::~MetadataStorageFromPlainRewrit
bool MetadataStorageFromPlainRewritableObjectStorage::exists(const std::string & path) const
{
if (MetadataStorageFromPlainObjectStorage::exists(path))
if (isDirectory(path))
return true;
if (useSeparateLayoutForMetadata())
{
auto key_prefix = object_storage->generateObjectKeyForPath(path, getMetadataKeyPrefix()).serialize();
return object_storage->existsOrHasAnyChild(key_prefix);
}
return getObjectMetadataEntryWithCache(path) != nullptr;
}
return false;
bool MetadataStorageFromPlainRewritableObjectStorage::isFile(const std::string & path) const
{
if (isDirectory(path))
return false;
return getObjectMetadataEntryWithCache(path) != nullptr;
}
bool MetadataStorageFromPlainRewritableObjectStorage::isDirectory(const std::string & path) const
{
if (useSeparateLayoutForMetadata())
{
auto directory = std::filesystem::path(object_storage->generateObjectKeyForPath(path, getMetadataKeyPrefix()).serialize()) / "";
return object_storage->existsOrHasAnyChild(directory);
}
else
return MetadataStorageFromPlainObjectStorage::isDirectory(path);
return path_map->getRemotePathInfoIfExists(path) != std::nullopt;
}
std::vector<std::string> MetadataStorageFromPlainRewritableObjectStorage::listDirectory(const std::string & path) const
@ -260,6 +268,18 @@ std::vector<std::string> MetadataStorageFromPlainRewritableObjectStorage::listDi
return std::vector<std::string>(std::make_move_iterator(directories.begin()), std::make_move_iterator(directories.end()));
}
std::optional<Poco::Timestamp> MetadataStorageFromPlainRewritableObjectStorage::getLastModifiedIfExists(const String & path) const
{
/// Path corresponds to a directory.
if (auto remote = path_map->getRemotePathInfoIfExists(path))
return Poco::Timestamp::fromEpochTime(remote->last_modified);
/// A file.
if (auto res = getObjectMetadataEntryWithCache(path))
return Poco::Timestamp::fromEpochTime(res->last_modified);
return std::nullopt;
}
void MetadataStorageFromPlainRewritableObjectStorage::getDirectChildrenOnDisk(
const std::string & storage_key,
const RelativePathsWithMetadata & remote_paths,

View File

@ -13,20 +13,26 @@ class MetadataStorageFromPlainRewritableObjectStorage final : public MetadataSto
{
private:
const std::string metadata_key_prefix;
std::shared_ptr<InMemoryPathMap> path_map;
std::shared_ptr<InMemoryDirectoryPathMap> path_map;
public:
MetadataStorageFromPlainRewritableObjectStorage(ObjectStoragePtr object_storage_, String storage_path_prefix_);
MetadataStorageFromPlainRewritableObjectStorage(
ObjectStoragePtr object_storage_, String storage_path_prefix_, size_t object_metadata_cache_size);
~MetadataStorageFromPlainRewritableObjectStorage() override;
MetadataStorageType getType() const override { return MetadataStorageType::PlainRewritable; }
bool exists(const std::string & path) const override;
bool isFile(const std::string & path) const override;
bool isDirectory(const std::string & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override;
std::optional<Poco::Timestamp> getLastModifiedIfExists(const String & path) const override;
protected:
std::string getMetadataKeyPrefix() const override { return metadata_key_prefix; }
std::shared_ptr<InMemoryPathMap> getPathMap() const override { return path_map; }
std::shared_ptr<InMemoryDirectoryPathMap> getPathMap() const override { return path_map; }
void getDirectChildrenOnDisk(
const std::string & storage_key,
const RelativePathsWithMetadata & remote_paths,

View File

@ -70,8 +70,12 @@ ASTPtr ASTAlterCommand::clone() const
void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
scope_guard closing_bracket_guard;
if (format_alter_commands_with_parentheses)
{
settings.ostr << "(";
closing_bracket_guard = make_scope_guard(std::function<void(void)>([&settings]() { settings.ostr << ")"; }));
}
if (type == ASTAlterCommand::ADD_COLUMN)
{
@ -498,9 +502,6 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState &
}
else
throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE, "Unexpected type of ALTER");
if (format_alter_commands_with_parentheses)
settings.ostr << ")";
}
void ASTAlterCommand::forEachPointerToChild(std::function<void(void**)> f)

View File

@ -140,6 +140,9 @@ bool ParserSubquery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
const ASTPtr & explained_ast = explain_query.getExplainedQuery();
if (explained_ast)
{
if (!explained_ast->as<ASTSelectWithUnionQuery>())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "EXPLAIN inside subquery supports only SELECT queries");
auto view_explain = makeASTFunction("viewExplain",
std::make_shared<ASTLiteral>(kind_str),
std::make_shared<ASTLiteral>(settings_str),

View File

@ -524,6 +524,14 @@ SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_n
return it == serializations.end() ? nullptr : it->second;
}
bool IMergeTreeDataPart::isMovingPart() const
{
fs::path part_directory_path = getDataPartStorage().getRelativePath();
if (part_directory_path.filename().empty())
part_directory_path = part_directory_path.parent_path();
return part_directory_path.parent_path().filename() == "moving";
}
void IMergeTreeDataPart::removeIfNeeded()
{
assert(assertHasValidVersionMetadata());
@ -548,10 +556,7 @@ void IMergeTreeDataPart::removeIfNeeded()
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);
fs::path part_directory_path = getDataPartStorage().getRelativePath();
if (part_directory_path.filename().empty())
part_directory_path = part_directory_path.parent_path();
bool is_moving_part = part_directory_path.parent_path().filename() == "moving";
bool is_moving_part = isMovingPart();
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(

View File

@ -429,6 +429,9 @@ public:
bool isProjectionPart() const { return parent_part != nullptr; }
/// Check if the part is in the `/moving` directory
bool isMovingPart() const;
const IMergeTreeDataPart * getParentPart() const { return parent_part; }
String getParentPartName() const { return parent_part_name; }

View File

@ -8038,33 +8038,49 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
{
if (lock->isLocked())
{
cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
parts_mover.swapClonedPart(cloned_part);
break;
}
else if (wait_for_move_if_zero_copy)
{
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
/// Wait and checks not only for timeout but also for shutdown and so on.
while (!waitZeroCopyLockToDisappear(*lock, 3000))
{
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
}
}
else
break;
}
else
auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk);
if (!lock)
{
/// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
LOG_DEBUG(
log,
"Move of part {} postponed, because zero copy mode enabled and zero-copy lock was not acquired",
moving_part.part->name);
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
break;
}
if (lock->isLocked())
{
cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
/// Cloning part can take a long time.
/// Recheck if the lock (and keeper session expirity) is OK
if (lock->isLocked())
{
parts_mover.swapClonedPart(cloned_part);
break; /// Successfully moved
}
else
{
LOG_DEBUG(
log,
"Move of part {} postponed, because zero copy mode enabled and zero-copy lock was lost during cloning the part",
moving_part.part->name);
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
break;
}
}
if (wait_for_move_if_zero_copy)
{
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
/// Wait and checks not only for timeout but also for shutdown and so on.
while (!waitZeroCopyLockToDisappear(*lock, 3000))
{
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
}
}
else
break;
}
}
else /// Ordinary move as it should be

View File

@ -231,6 +231,14 @@ void minmaxIndexValidator(const IndexDescription & index, bool attach)
"Data type of argument for minmax index must be comparable, got {} type for column {} instead",
column.type->getName(), column.name);
}
if (isDynamic(column.type) || isVariant(column.type))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} data type of column {} is not allowed in minmax index because the column of that type can contain values with different data "
"types. Consider using typed subcolumns or cast column to a specific data type",
column.type->getName(), column.name);
}
}
}

View File

@ -255,6 +255,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
disk->createDirectories(path_to_clone);
/// TODO: Make it possible to fetch only zero-copy part without fallback to fetching a full-copy one
auto zero_copy_part = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
if (zero_copy_part)
@ -297,6 +298,28 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
return cloned_part;
}
void MergeTreePartsMover::renameClonedPart(IMergeTreeDataPart & part) const
try
{
part.is_temp = false;
/// Mark it DeleteOnDestroy to ensure deleting in destructor
/// if something goes wrong before swapping
part.setState(MergeTreeDataPartState::DeleteOnDestroy);
/// Don't remove new directory but throw an error because it may contain part which is currently in use.
part.renameTo(part.name, /* remove_new_dir_if_exists */ false);
}
catch (...)
{
/// Check if part was renamed or not
/// `renameTo()` does not provide strong exception guarantee in case of an exception
if (part.isMovingPart())
{
/// Restore its temporary state
part.is_temp = true;
part.setState(MergeTreeDataPartState::Temporary);
}
throw;
}
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
{
@ -323,12 +346,23 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons
return;
}
cloned_part.part->is_temp = false;
/// It is safe to acquire zero-copy lock for the temporary part here
/// because no one can fetch it until it is *swapped*.
///
/// Set ASK_KEEPER to try to unlock it in destructor if something goes wrong before *renaming*
/// If unlocking is failed we will not get a stuck part in moving directory
/// because it will be renamed to delete_tmp_<name> beforehand and cleaned up later.
/// Worst outcomes: trash in object storage and/or orphaned shared zero-copy lock. It is acceptable.
/// See DataPartStorageOnDiskBase::remove().
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER;
data->lockSharedData(*cloned_part.part, /* replace_existing_lock = */ true);
/// Don't remove new directory but throw an error because it may contain part which is currently in use.
cloned_part.part->renameTo(active_part->name, false);
renameClonedPart(*cloned_part.part);
/// TODO what happen if server goes down here?
/// If server goes down here we will get two copy of the part with the same name on different disks.
/// And on the next ClickHouse startup during loading parts the first copy (in the order of defining disks
/// in the storage policy) will be loaded as Active, the second one will be loaded as Outdated and removed as duplicate.
/// See MergeTreeData::loadDataParts().
data->swapActivePart(cloned_part.part, part_lock);
LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath());

View File

@ -75,6 +75,9 @@ public:
/// merge or mutation.
void swapClonedPart(TemporaryClonedPart & cloned_part) const;
/// Rename cloned part from `moving/` directory to the actual part storage
void renameClonedPart(IMergeTreeDataPart & part) const;
/// Can stop background moves and moves from queries
ActionBlocker moves_blocker;

View File

@ -175,6 +175,8 @@ namespace FailPoints
extern const char replicated_queue_fail_next_entry[];
extern const char replicated_queue_unfail_entries[];
extern const char finish_set_quorum_failed_parts[];
extern const char zero_copy_lock_zk_fail_before_op[];
extern const char zero_copy_lock_zk_fail_after_op[];
}
namespace ErrorCodes
@ -10419,6 +10421,10 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
Coordination::Requests ops;
Coordination::Responses responses;
getZeroCopyLockNodeCreateOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files);
fiu_do_on(FailPoints::zero_copy_lock_zk_fail_before_op, { zookeeper->forceFailureBeforeOperation(); });
fiu_do_on(FailPoints::zero_copy_lock_zk_fail_after_op, { zookeeper->forceFailureAfterOperation(); });
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
{

View File

@ -2,7 +2,9 @@
<storage_configuration>
<disks>
<disk_s3_plain_rewritable>
<type>s3_plain_rewritable</type>
<type>object_storage</type>
<object_storage_type>s3</object_storage_type>
<metadata_type>plain_rewritable</metadata_type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint_subpath from_env="ENDPOINT_SUBPATH"></endpoint_subpath>
<access_key_id>minio</access_key_id>
@ -15,6 +17,16 @@
<max_size>1000000000</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
</disk_cache_s3_plain_rewritable>
<disk_s3_plain_rewritable_with_metadata_cache>
<type>object_storage</type>
<object_storage_type>s3</object_storage_type>
<metadata_type>plain_rewritable</metadata_type>
<endpoint>http://minio1:9001/root/data_with_cache/</endpoint>
<endpoint_subpath from_env="ENDPOINT_SUBPATH"></endpoint_subpath>
<object_metadata_cache_size>1000</object_metadata_cache_size>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3_plain_rewritable_with_metadata_cache>
</disks>
<policies>
<s3_plain_rewritable>
@ -31,6 +43,13 @@
</main>
</volumes>
</cache_s3_plain_rewritable>
<s3_plain_rewritable_with_metadata_cache>
<volumes>
<main>
<disk>disk_s3_plain_rewritable_with_metadata_cache</disk>
</main>
</volumes>
</s3_plain_rewritable_with_metadata_cache>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -44,13 +44,14 @@ def start_cluster():
@pytest.mark.parametrize(
"storage_policy",
"storage_policy,key_prefix",
[
pytest.param("s3_plain_rewritable"),
pytest.param("cache_s3_plain_rewritable"),
pytest.param("s3_plain_rewritable", "data/"),
pytest.param("cache_s3_plain_rewritable", "data/"),
pytest.param("s3_plain_rewritable_with_metadata_cache", "data_with_cache/"),
],
)
def test(storage_policy):
def test(storage_policy, key_prefix):
def create_insert(node, insert_values):
node.query(
"""
@ -140,7 +141,7 @@ def test(storage_policy):
)
metadata_it = cluster.minio_client.list_objects(
cluster.minio_bucket, "data/", recursive=True
cluster.minio_bucket, key_prefix, recursive=True
)
metadata_count = 0
for obj in list(metadata_it):
@ -157,7 +158,7 @@ def test(storage_policy):
node.query("DROP TABLE IF EXISTS test SYNC")
it = cluster.minio_client.list_objects(
cluster.minio_bucket, "data/", recursive=True
cluster.minio_bucket, key_prefix, recursive=True
)
assert len(list(it)) == 0

View File

@ -7,21 +7,21 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
<send_metadata>false</send_metadata>
</s31>
<s31_again>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
<send_metadata>false</send_metadata>
</s31_again>
<s32>
<type>s3</type>
<endpoint>http://minio1:9001/root/data2/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
<send_metadata>false</send_metadata>
</s32>
</disks>
<policies>

View File

@ -1,9 +1,11 @@
import datetime
import logging
import threading
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@ -76,15 +78,19 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
assert get_large_objects_count(cluster, size=size) == expected
def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
def wait_for_active_parts(
node, num_expected_parts, table_name, timeout=30, disk_name=None
):
deadline = time.monotonic() + timeout
num_parts = 0
while time.monotonic() < deadline:
num_parts_str = node.query(
"select count() from system.parts where table = '{}' and active".format(
table_name
)
query = (
f"select count() from system.parts where table = '{table_name}' and active"
)
if disk_name:
query += f" and disk_name='{disk_name}'"
num_parts_str = node.query(query)
num_parts = int(num_parts_str.strip())
if num_parts == num_expected_parts:
return
@ -94,6 +100,22 @@ def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
assert num_parts == num_expected_parts
@pytest.fixture(scope="function")
def test_name(request):
return request.node.name
@pytest.fixture(scope="function")
def test_table(test_name):
normalized = (
test_name.replace("[", "_")
.replace("]", "_")
.replace(" ", "_")
.replace("-", "_")
)
return "table_" + normalized
# Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning
@pytest.mark.order(0)
@pytest.mark.parametrize("policy", ["s3"])
@ -667,3 +689,111 @@ def test_s3_zero_copy_keeps_data_after_mutation(started_cluster):
time.sleep(10)
check_objects_not_exisis(cluster, objectsY)
@pytest.mark.parametrize(
"failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"]
)
def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
f"""
CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}')
ORDER BY date PARTITION BY date
SETTINGS storage_policy='hybrid'
"""
)
date = "2024-10-23"
node2.query(f"SYSTEM STOP FETCHES {test_table}")
node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')")
# Try to move and get fail on acquring zero-copy shared lock
node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint}")
node1.query_and_get_error(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
# After fail the part must remain on the source disk
assert (
node1.query(
f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name"
)
== "default\n"
)
# Try another attempt after zk connection is restored
# It should not failed due to leftovers of previous attempt (temporary cloned files)
node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint}")
node1.query(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
assert (
node1.query(
f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name"
)
== "s31\n"
)
# Sanity check
node2.query(f"SYSTEM START FETCHES {test_table}")
wait_for_active_parts(node2, 1, test_table, disk_name="s31")
assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n"
node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
f"""
CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}')
ORDER BY date PARTITION BY date
SETTINGS storage_policy='hybrid'
"""
)
date = "2024-10-23"
node2.query(f"SYSTEM STOP FETCHES {test_table}")
node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')")
# Pause moving after part cloning, but before swapping
node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
def move(node):
node.query_and_get_error(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
# Start moving
t1 = threading.Thread(target=move, args=[node1])
t1.start()
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node1)
# Continue moving and try to swap
node1.query("SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
t1.join()
# Previous MOVE was failed, try another one after zk connection is restored
# It should not failed due to leftovers of previous attempt (temporary cloned files)
node1.query_with_retry(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
# Sanity check
node2.query(f"SYSTEM START FETCHES {test_table}")
wait_for_active_parts(node2, 1, test_table, disk_name="s31")
assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n"
node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")

View File

@ -43,3 +43,10 @@ ALTER TABLE a\\n (DROP COLUMN b),\\n (DROP COLUMN c)
"""
result = node.query(INPUT)
assert result == EXPECTED_OUTPUT
def test_move_partition_to_table_command():
INPUT = "SELECT formatQuery('ALTER TABLE a MOVE PARTITION tuple() TO TABLE b')"
EXPECTED_OUTPUT = "ALTER TABLE a\\n (MOVE PARTITION tuple() TO TABLE b)\n"
result = node.query(INPUT)
assert result == EXPECTED_OUTPUT

View File

@ -0,0 +1,18 @@
set allow_experimental_dynamic_type=1;
select max(number::Dynamic) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select min(number::Dynamic) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select argMax(number, number::Dynamic) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select argMin(number, number::Dynamic) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select anyArgMax(number, number::Dynamic) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select anyArgMin(number, number::Dynamic) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
create table test (d Dynamic, index idx d type minmax); -- {serverError BAD_ARGUMENTS}
set allow_experimental_variant_type=1;
select max(number::Variant(UInt64)) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select min(number::Variant(UInt64)) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select argMax(number, number::Variant(UInt64)) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select argMin(number, number::Variant(UInt64)) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select anyArgMax(number, number::Variant(UInt64)) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
select anyArgMin(number, number::Variant(UInt64)) from numbers(10); -- {serverError ILLEGAL_TYPE_OF_ARGUMENT}
create table test (d Variant(UInt64), index idx d type minmax); -- {serverError BAD_ARGUMENTS}

View File

@ -0,0 +1,11 @@
SelectWithUnionQuery (children 1)
ExpressionList (children 1)
SelectQuery (children 2)
ExpressionList (children 1)
Asterisk
TablesInSelectQuery (children 1)
TablesInSelectQueryElement (children 1)
TableExpression (children 1)
Function numbers (children 1)
ExpressionList (children 1)
Literal UInt64_10

View File

@ -0,0 +1,5 @@
SELECT * FROM ( EXPLAIN AST SELECT * FROM numbers(10) );
SELECT * FROM ( EXPLAIN AST CREATE TABLE test ENGINE=Memory ); -- {clientError BAD_ARGUMENTS}
SELECT * FROM ( EXPLAIN AST CREATE MATERIALIZED VIEW mv (data String) AS SELECT data FROM table ); -- {clientError BAD_ARGUMENTS}
SELECT * FROM ( EXPLAIN AST INSERT INTO TABLE test VALUES); -- {clientError BAD_ARGUMENTS}
SELECT * FROM ( EXPLAIN AST ALTER TABLE test MODIFY COLUMN x UInt32 ); -- {clientError BAD_ARGUMENTS}