mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 19:45:11 +00:00
Merge branch 'ClickHouse:master' into master
This commit is contained in:
commit
4f8c4cd4a8
@ -2267,6 +2267,21 @@ try
|
||||
throw;
|
||||
}
|
||||
|
||||
bool found_stop_flag = false;
|
||||
|
||||
if (has_zookeeper && global_context->getMacros()->getMacroMap().contains("replica"))
|
||||
{
|
||||
auto zookeeper = global_context->getZooKeeper();
|
||||
String stop_flag_path = "/clickhouse/stop_replicated_ddl_queries/{replica}";
|
||||
stop_flag_path = global_context->getMacros()->expand(stop_flag_path);
|
||||
found_stop_flag = zookeeper->exists(stop_flag_path);
|
||||
}
|
||||
|
||||
if (found_stop_flag)
|
||||
LOG_INFO(log, "Found a stop flag for replicated DDL queries. They will be disabled");
|
||||
else
|
||||
DatabaseCatalog::instance().startReplicatedDDLQueries();
|
||||
|
||||
LOG_DEBUG(log, "Loaded metadata.");
|
||||
|
||||
if (has_trace_collector)
|
||||
|
@ -9,6 +9,8 @@
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "config.h"
|
||||
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
#include "config.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -193,6 +193,7 @@ enum class AccessType : uint8_t
|
||||
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \
|
||||
M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP REPLICATION QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \
|
||||
M(SYSTEM_VIRTUAL_PARTS_UPDATE, "SYSTEM STOP VIRTUAL PARTS UPDATE, SYSTEM START VIRTUAL PARTS UPDATE, STOP VIRTUAL PARTS UPDATE, START VIRTUAL PARTS UPDATE", TABLE, SYSTEM) \
|
||||
M(SYSTEM_REDUCE_BLOCKING_PARTS, "SYSTEM STOP REDUCE BLOCKING PARTS, SYSTEM START REDUCE BLOCKING PARTS, STOP REDUCE BLOCKING PARTS, START REDUCE BLOCKING PARTS", TABLE, SYSTEM) \
|
||||
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
|
||||
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
|
||||
M(SYSTEM_REPLICA_READINESS, "SYSTEM REPLICA READY, SYSTEM REPLICA UNREADY", GLOBAL, SYSTEM) \
|
||||
|
@ -22,6 +22,10 @@ public:
|
||||
const std::vector<UUID> & current_roles,
|
||||
const std::vector<UUID> & current_roles_with_admin_option);
|
||||
|
||||
std::shared_ptr<const EnabledRoles> getEnabledRoles(
|
||||
boost::container::flat_set<UUID> current_roles,
|
||||
boost::container::flat_set<UUID> current_roles_with_admin_option);
|
||||
|
||||
private:
|
||||
using SubscriptionsOnRoles = std::vector<std::shared_ptr<scope_guard>>;
|
||||
|
||||
|
@ -284,7 +284,8 @@ TEST(AccessRights, Union)
|
||||
"CREATE DICTIONARY, DROP DATABASE, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, "
|
||||
"TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, "
|
||||
"SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, "
|
||||
"SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM VIRTUAL PARTS UPDATE, "
|
||||
"SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, "
|
||||
"SYSTEM REPLICATION QUEUES, SYSTEM VIRTUAL PARTS UPDATE, SYSTEM REDUCE BLOCKING PARTS, "
|
||||
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
|
||||
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, "
|
||||
"SYSTEM UNLOAD PRIMARY KEY, dictGet ON db1.*, GRANT TABLE ENGINE ON db1, "
|
||||
|
@ -68,6 +68,11 @@ UUID loadServerUUID(const fs::path & server_uuid_file, Poco::Logger * log)
|
||||
}
|
||||
}
|
||||
|
||||
void ServerUUID::set(UUID & uuid)
|
||||
{
|
||||
server_uuid = uuid;
|
||||
}
|
||||
|
||||
void ServerUUID::setRandomForUnitTests()
|
||||
{
|
||||
server_uuid = UUIDHelpers::generateV4();
|
||||
|
@ -20,6 +20,9 @@ public:
|
||||
/// Loads server UUID from file or creates new one. Should be called on daemon startup.
|
||||
static void load(const fs::path & server_uuid_file, Poco::Logger * log);
|
||||
|
||||
/// Sets specific server UUID.
|
||||
static void set(UUID & uuid);
|
||||
|
||||
static void setRandomForUnitTests();
|
||||
};
|
||||
|
||||
|
@ -64,6 +64,9 @@ namespace UUIDHelpers
|
||||
/// Generate random UUID.
|
||||
UUID generateV4();
|
||||
|
||||
/// Generate UUID from hash of a string.
|
||||
UUID makeUUIDv4FromHash(const String & string);
|
||||
|
||||
constexpr size_t HighBytes = (std::endian::native == std::endian::little) ? 0 : 1;
|
||||
constexpr size_t LowBytes = (std::endian::native == std::endian::little) ? 1 : 0;
|
||||
|
||||
|
@ -85,6 +85,7 @@ namespace ErrorCodes
|
||||
extern const int NO_ACTIVE_REPLICAS;
|
||||
extern const int CANNOT_GET_REPLICATED_DATABASE_SNAPSHOT;
|
||||
extern const int CANNOT_RESTORE_TABLE;
|
||||
extern const int QUERY_IS_PROHIBITED;
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
@ -1057,6 +1058,9 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
|
||||
{
|
||||
waitDatabaseStarted();
|
||||
|
||||
if (!DatabaseCatalog::instance().canPerformReplicatedDDLQueries())
|
||||
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Replicated DDL queries are disabled");
|
||||
|
||||
if (query_context->getCurrentTransaction() && query_context->getSettingsRef()[Setting::throw_on_unsupported_query_inside_transaction])
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Distributed DDL queries inside transactions are not supported");
|
||||
|
||||
@ -1237,14 +1241,16 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
String query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Ordinary", backQuoteIfNeed(to_db_name));
|
||||
auto query_context = Context::createCopy(getContext());
|
||||
query_context->setSetting("allow_deprecated_database_ordinary", 1);
|
||||
executeQuery(query, query_context, QueryFlags{.internal = true});
|
||||
query_context->setSetting("cloud_mode", false);
|
||||
executeQuery(query, query_context, QueryFlags{ .internal = true });
|
||||
|
||||
/// But we want to avoid discarding UUID of ReplicatedMergeTree tables, because it will not work
|
||||
/// if zookeeper_path contains {uuid} macro. Replicated database do not recreate replicated tables on recovery,
|
||||
/// so it's ok to save UUID of replicated table.
|
||||
query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Atomic", backQuoteIfNeed(to_db_name_replicated));
|
||||
query_context = Context::createCopy(getContext());
|
||||
executeQuery(query, query_context, QueryFlags{.internal = true});
|
||||
query_context->setSetting("cloud_mode", false);
|
||||
executeQuery(query, query_context, QueryFlags{ .internal = true });
|
||||
}
|
||||
|
||||
size_t moved_tables = 0;
|
||||
@ -1634,7 +1640,7 @@ void DatabaseReplicated::dropTable(ContextPtr local_context, const String & tabl
|
||||
auto table = tryGetTable(table_name, getContext());
|
||||
if (!table)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} doesn't exist", table_name);
|
||||
if (table->getName() == "MaterializedView" || table->getName() == "WindowView")
|
||||
if (table->getName() == "MaterializedView" || table->getName() == "WindowView" || table->getName() == "SharedSet" || table->getName() == "SharedJoin")
|
||||
{
|
||||
/// Avoid recursive locking of metadata_mutex
|
||||
table->dropInnerTableIfAny(sync, local_context);
|
||||
|
@ -43,6 +43,8 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
|
||||
context->setSetting("enable_zstd_qat_codec", 1);
|
||||
context->setSetting("allow_create_index_without_type", 1);
|
||||
context->setSetting("allow_experimental_s3queue", 1);
|
||||
|
||||
/// clickhouse-private settings
|
||||
context->setSetting("allow_experimental_shared_set_join", 1);
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,9 @@ namespace DB
|
||||
namespace Setting
|
||||
{
|
||||
extern const SettingsSeconds max_execution_time;
|
||||
|
||||
/// Cloud only
|
||||
extern const SettingsBool cloud_mode;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -33,6 +36,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int DICTIONARY_ACCESS_DENIED;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(
|
||||
@ -192,6 +196,9 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
|
||||
const std::string & /* default_database */,
|
||||
bool created_from_ddl) -> DictionarySourcePtr
|
||||
{
|
||||
if (global_context->getSettingsRef()[Setting::cloud_mode])
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Dictionary source of type `executable pool` is disabled");
|
||||
|
||||
if (dict_struct.has_expressions)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `executable_pool` does not support attribute expressions");
|
||||
|
||||
|
@ -29,7 +29,6 @@ namespace DB
|
||||
ContextPtr global_context,
|
||||
const std::string & /* default_database */,
|
||||
bool /* created_from_ddl */) -> DictionarySourcePtr {
|
||||
|
||||
auto redis_config_prefix = config_prefix + ".redis";
|
||||
|
||||
auto host = config.getString(redis_config_prefix + ".host");
|
||||
|
@ -28,6 +28,9 @@ namespace Setting
|
||||
{
|
||||
extern const SettingsSeconds http_receive_timeout;
|
||||
extern const SettingsBool odbc_bridge_use_connection_pooling;
|
||||
|
||||
/// Cloud only
|
||||
extern const SettingsBool cloud_mode;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -242,6 +245,9 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
|
||||
ContextPtr global_context,
|
||||
const std::string & /* default_database */,
|
||||
bool /* check_config */) -> DictionarySourcePtr {
|
||||
|
||||
if (global_context->getSettingsRef()[Setting::cloud_mode])
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Dictionary source of type `odbc` is disabled");
|
||||
#if USE_ODBC
|
||||
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(
|
||||
global_context,
|
||||
|
@ -313,6 +313,8 @@ public:
|
||||
return std::make_shared<FakeDiskTransaction>(*this);
|
||||
}
|
||||
|
||||
/// Need to overwrite explicetly because this disk change
|
||||
/// a lot of "delegate" methods.
|
||||
return createEncryptedTransaction();
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <Disks/DiskEncryptedTransaction.h>
|
||||
|
||||
|
||||
#if USE_SSL
|
||||
#include <IO/FileEncryptionCommon.h>
|
||||
#include <Common/Exception.h>
|
||||
|
@ -27,9 +27,11 @@ enum class MetadataStorageType : uint8_t
|
||||
{
|
||||
None,
|
||||
Local,
|
||||
Keeper,
|
||||
Plain,
|
||||
PlainRewritable,
|
||||
StaticWeb,
|
||||
Memory,
|
||||
};
|
||||
|
||||
MetadataStorageType metadataTypeFromString(const String & type);
|
||||
|
@ -497,7 +497,7 @@ public:
|
||||
|
||||
|
||||
protected:
|
||||
friend class DiskDecorator;
|
||||
friend class DiskReadOnlyWrapper;
|
||||
|
||||
const String name;
|
||||
|
||||
@ -580,6 +580,7 @@ inline String directoryPath(const String & path)
|
||||
return fs::path(path).parent_path() / "";
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
template <>
|
||||
|
@ -21,7 +21,7 @@ namespace ErrorCodes
|
||||
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
|
||||
{
|
||||
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
|
||||
if (!settings.enable_filesystem_cache)
|
||||
if (!settings.enable_filesystem_cache && !settings.read_through_distributed_cache)
|
||||
return settings.remote_fs_buffer_size;
|
||||
|
||||
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
|
||||
|
@ -56,6 +56,8 @@ public:
|
||||
|
||||
void deserialize(ReadBuffer & buf);
|
||||
void deserializeFromString(const std::string & data);
|
||||
/// This method was deleted from public fork recently by Azat
|
||||
void createFromSingleObject(ObjectStorageKey object_key, size_t bytes_size, size_t ref_count_, bool is_read_only_);
|
||||
|
||||
void serialize(WriteBuffer & buf, bool sync) const;
|
||||
std::string serializeToString() const;
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
|
||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||
#include <Disks/IO/getThreadPoolReader.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
|
@ -146,4 +146,9 @@ bool ReadBufferFromPocoSocketBase::poll(size_t timeout_microseconds) const
|
||||
return res;
|
||||
}
|
||||
|
||||
void ReadBufferFromPocoSocketBase::setReceiveTimeout(size_t receive_timeout_microseconds)
|
||||
{
|
||||
socket.setReceiveTimeout(Poco::Timespan(receive_timeout_microseconds, 0));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -34,6 +34,8 @@ public:
|
||||
|
||||
ssize_t socketReceiveBytesImpl(char * ptr, size_t size);
|
||||
|
||||
void setReceiveTimeout(size_t receive_timeout_microseconds);
|
||||
|
||||
private:
|
||||
AsyncCallback async_callback;
|
||||
std::string socket_description;
|
||||
|
@ -48,7 +48,6 @@ bool S3Exception::isRetryableError() const
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int S3_ERROR;
|
||||
|
@ -20,6 +20,8 @@ namespace ActionLocks
|
||||
extern const StorageActionBlockType PullReplicationLog = 8;
|
||||
extern const StorageActionBlockType Cleanup = 9;
|
||||
extern const StorageActionBlockType ViewRefresh = 10;
|
||||
extern const StorageActionBlockType VirtualPartsUpdate = 11;
|
||||
extern const StorageActionBlockType ReduceBlockingParts = 12;
|
||||
}
|
||||
|
||||
|
||||
|
@ -96,6 +96,7 @@ void BlobStorageLog::prepareTable()
|
||||
std::unique_lock lock{prepare_mutex};
|
||||
const auto & relative_data_path = merge_tree_table->getRelativeDataPath();
|
||||
prefix_to_ignore = normalizePath(relative_data_path);
|
||||
LOG_DEBUG(log, "Will ignore blobs with prefix {}", prefix_to_ignore);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
|
||||
/// Based on the LRU algorithm implementation, the record with the lowest priority is stored at
|
||||
/// the head of the queue, and the record with the highest priority is stored at the tail.
|
||||
class LRUFileCachePriority final : public IFileCachePriority
|
||||
class LRUFileCachePriority : public IFileCachePriority
|
||||
{
|
||||
protected:
|
||||
struct State
|
||||
@ -85,6 +85,8 @@ public:
|
||||
|
||||
bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) override;
|
||||
|
||||
FileCachePriorityPtr copy() const { return std::make_unique<LRUFileCachePriority>(max_size, max_elements, state); }
|
||||
|
||||
private:
|
||||
class LRUIterator;
|
||||
using LRUQueue = std::list<EntryPtr>;
|
||||
|
@ -72,7 +72,12 @@ public:
|
||||
|
||||
bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) override;
|
||||
|
||||
FileCachePriorityPtr copy() const { return std::make_unique<SLRUFileCachePriority>(max_size, max_elements, size_ratio, probationary_queue.state, protected_queue.state); }
|
||||
|
||||
private:
|
||||
using LRUIterator = LRUFileCachePriority::LRUIterator;
|
||||
using LRUQueue = std::list<Entry>;
|
||||
|
||||
double size_ratio;
|
||||
LRUFileCachePriority protected_queue;
|
||||
LRUFileCachePriority probationary_queue;
|
||||
|
@ -1817,6 +1817,21 @@ void DatabaseCatalog::triggerReloadDisksTask(const Strings & new_added_disks)
|
||||
(*reload_disks_task)->schedule();
|
||||
}
|
||||
|
||||
void DatabaseCatalog::stopReplicatedDDLQueries()
|
||||
{
|
||||
replicated_ddl_queries_enabled = false;
|
||||
}
|
||||
|
||||
void DatabaseCatalog::startReplicatedDDLQueries()
|
||||
{
|
||||
replicated_ddl_queries_enabled = true;
|
||||
}
|
||||
|
||||
bool DatabaseCatalog::canPerformReplicatedDDLQueries() const
|
||||
{
|
||||
return replicated_ddl_queries_enabled;
|
||||
}
|
||||
|
||||
static void maybeUnlockUUID(UUID uuid)
|
||||
{
|
||||
if (uuid == UUIDHelpers::Nil)
|
||||
|
@ -266,6 +266,10 @@ public:
|
||||
|
||||
void triggerReloadDisksTask(const Strings & new_added_disks);
|
||||
|
||||
void stopReplicatedDDLQueries();
|
||||
void startReplicatedDDLQueries();
|
||||
bool canPerformReplicatedDDLQueries() const;
|
||||
|
||||
private:
|
||||
// The global instance of database catalog. unique_ptr is to allow
|
||||
// deferred initialization. Thought I'd use std::optional, but I can't
|
||||
@ -361,6 +365,8 @@ private:
|
||||
std::mutex reload_disks_mutex;
|
||||
std::set<String> disks_to_reload;
|
||||
static constexpr time_t DBMS_DEFAULT_DISK_RELOAD_PERIOD_SEC = 5;
|
||||
|
||||
std::atomic<bool> replicated_ddl_queries_enabled = false;
|
||||
};
|
||||
|
||||
|
||||
|
@ -82,6 +82,9 @@ private:
|
||||
|
||||
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
|
||||
void startStopAction(StorageActionBlockType action_type, bool start);
|
||||
|
||||
void stopReplicatedDDLQueries();
|
||||
void startReplicatedDDLQueries();
|
||||
};
|
||||
|
||||
|
||||
|
@ -40,7 +40,6 @@ class MutationsInterpreter
|
||||
{
|
||||
private:
|
||||
struct Stage;
|
||||
|
||||
public:
|
||||
struct Settings
|
||||
{
|
||||
|
@ -98,7 +98,6 @@ public:
|
||||
|
||||
/// Closes and removes session
|
||||
void closeSession(const String & session_id);
|
||||
|
||||
private:
|
||||
std::shared_ptr<SessionLog> getSessionLog() const;
|
||||
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;
|
||||
|
@ -19,6 +19,7 @@ Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_bloc
|
||||
, min_block_size_bytes(min_block_size_bytes_)
|
||||
, header(header_)
|
||||
{
|
||||
LOG_TEST(getLogger("Squashing"), "header columns {}", header.columns());
|
||||
}
|
||||
|
||||
Chunk Squashing::flush()
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <Core/Settings.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include "Parsers/ASTSystemQuery.h"
|
||||
#include <Databases/DatabaseReplicated.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
@ -93,6 +94,12 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
|
||||
if (!context->getSettingsRef()[Setting::allow_distributed_ddl])
|
||||
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Distributed DDL queries are prohibited for the user");
|
||||
|
||||
bool is_system_query = dynamic_cast<ASTSystemQuery *>(query_ptr.get()) != nullptr;
|
||||
bool replicated_ddl_queries_enabled = DatabaseCatalog::instance().canPerformReplicatedDDLQueries();
|
||||
|
||||
if (!is_system_query && !replicated_ddl_queries_enabled)
|
||||
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Replicated DDL queries are disabled");
|
||||
|
||||
if (const auto * query_alter = query_ptr->as<ASTAlterQuery>())
|
||||
{
|
||||
for (const auto & command : query_alter->command_list->children)
|
||||
|
@ -99,6 +99,7 @@ namespace DB
|
||||
MR_MACROS(COMPRESSION, "COMPRESSION") \
|
||||
MR_MACROS(CONST, "CONST") \
|
||||
MR_MACROS(CONSTRAINT, "CONSTRAINT") \
|
||||
MR_MACROS(CONNECTIONS, "CONNECTIONS") \
|
||||
MR_MACROS(CREATE_POLICY, "CREATE POLICY") \
|
||||
MR_MACROS(CREATE_PROFILE, "CREATE PROFILE") \
|
||||
MR_MACROS(CREATE_QUOTA, "CREATE QUOTA") \
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/StringUtils.h>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
namespace DB
|
||||
|
@ -53,6 +53,9 @@ PlacementInfo & PlacementInfo::instance()
|
||||
void PlacementInfo::initialize(const Poco::Util::AbstractConfiguration & config)
|
||||
try
|
||||
{
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
if (!config.has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX))
|
||||
{
|
||||
availability_zone = "";
|
||||
|
@ -22,6 +22,7 @@ struct FutureMergedMutatedPart
|
||||
MergeTreeDataPartFormat part_format;
|
||||
MergeTreePartInfo part_info;
|
||||
MergeTreeData::DataPartsVector parts;
|
||||
std::vector<MergeTreePartInfo> blocking_parts_to_remove;
|
||||
MergeType merge_type = MergeType::Regular;
|
||||
|
||||
const MergeTreePartition & getPartition() const { return parts.front()->partition; }
|
||||
|
@ -18,6 +18,7 @@ public:
|
||||
using ValueSizeMap = std::map<std::string, double>;
|
||||
using VirtualFields = std::unordered_map<String, Field>;
|
||||
using DeserializeBinaryBulkStateMap = std::map<std::string, ISerialization::DeserializeBinaryBulkStatePtr>;
|
||||
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
|
||||
|
||||
IMergeTreeReader(
|
||||
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
|
||||
|
@ -83,6 +83,9 @@ bool MergeProjectionPartsTask::executeStep()
|
||||
".tmp_proj");
|
||||
|
||||
next_level_parts.push_back(executeHere(tmp_part_merge_task));
|
||||
/// FIXME (alesapin) we should use some temporary storage for this,
|
||||
/// not commit each subprojection part
|
||||
next_level_parts.back()->getDataPartStorage().commitTransaction();
|
||||
next_level_parts.back()->is_temp = true;
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,7 @@ TrivialMergeSelector::PartsRange TrivialMergeSelector::select(
|
||||
|
||||
++right;
|
||||
|
||||
if (partition[right].level < partition[left].level)
|
||||
if (right < partition.size() && partition[right].level < partition[left].level)
|
||||
left = right;
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
|
||||
STRONG_TYPEDEF(UInt32, MergeTreeDataFormatVersion)
|
||||
|
||||
const MergeTreeDataFormatVersion MERGE_TREE_DATA_OLD_FORMAT_VERSION {0};
|
||||
const MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING {1};
|
||||
static constexpr MergeTreeDataFormatVersion MERGE_TREE_DATA_OLD_FORMAT_VERSION {0};
|
||||
static constexpr MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING {1};
|
||||
|
||||
}
|
||||
|
@ -106,9 +106,11 @@ public:
|
||||
PreformattedMessage & out_disable_reason,
|
||||
bool dry_run = false);
|
||||
|
||||
/// Actually the most fresh partition with biggest modification_time
|
||||
String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const;
|
||||
|
||||
/// Useful to quickly get a list of partitions that contain parts that we may want to merge
|
||||
/// The result is limited by top_number_of_partitions_to_consider_for_merge
|
||||
PartitionIdsHint getPartitionsThatMayBeMerged(
|
||||
size_t max_total_size_to_merge,
|
||||
const AllowedMergingPredicate & can_merge_callback,
|
||||
|
@ -45,6 +45,7 @@ public:
|
||||
enum Value
|
||||
{
|
||||
Full,
|
||||
Packed,
|
||||
Unknown,
|
||||
};
|
||||
|
||||
|
@ -179,8 +179,8 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Can't take information about index granularity from blocks, when non empty index_granularity array specified");
|
||||
|
||||
if (!getDataPartStorage().exists())
|
||||
getDataPartStorage().createDirectories();
|
||||
/// We don't need to check if it exists or not, createDirectories doesn't throw
|
||||
getDataPartStorage().createDirectories();
|
||||
|
||||
if (settings.rewrite_primary_key)
|
||||
initPrimaryIndex();
|
||||
|
@ -108,6 +108,14 @@ std::optional<MarkType> MergeTreeIndexGranularityInfo::getMarksTypeFromFilesyste
|
||||
return {};
|
||||
}
|
||||
|
||||
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
|
||||
MarkType mark_type_, size_t index_granularity_, size_t index_granularity_bytes_)
|
||||
: mark_type(mark_type_)
|
||||
, fixed_index_granularity(index_granularity_)
|
||||
, index_granularity_bytes(index_granularity_bytes_)
|
||||
{
|
||||
}
|
||||
|
||||
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_)
|
||||
: MergeTreeIndexGranularityInfo(storage, {storage.canUseAdaptiveGranularity(), (*storage.getSettings())[MergeTreeSetting::compress_marks], type_.getValue()})
|
||||
{
|
||||
|
@ -49,6 +49,7 @@ public:
|
||||
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MarkType mark_type_);
|
||||
|
||||
MergeTreeIndexGranularityInfo(MergeTreeDataPartType type_, bool is_adaptive_, size_t index_granularity_, size_t index_granularity_bytes_);
|
||||
MergeTreeIndexGranularityInfo(MarkType mark_type_, size_t index_granularity_, size_t index_granularity_bytes_);
|
||||
|
||||
void changeGranularityIfRequired(const IDataPartStorage & data_part_storage);
|
||||
|
||||
|
@ -26,11 +26,11 @@ void checkMutationStatus(std::optional<MergeTreeMutationStatus> & status, const
|
||||
throw Exception(
|
||||
ErrorCodes::UNFINISHED,
|
||||
"Exception happened during execution of mutation{} '{}' with part '{}' reason: '{}'. This error maybe retryable or not. "
|
||||
"In case of unretryable error, mutation can be killed with KILL MUTATION query",
|
||||
"In case of unretryable error, mutation can be killed with KILL MUTATION query \n\n{}\n",
|
||||
mutation_ids.size() > 1 ? "s" : "",
|
||||
boost::algorithm::join(mutation_ids, ", "),
|
||||
status->latest_failed_part,
|
||||
status->latest_fail_reason);
|
||||
status->latest_fail_reason, StackTrace().toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,13 @@ struct MergeTreePartInfo
|
||||
< std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation);
|
||||
}
|
||||
|
||||
bool operator>(const MergeTreePartInfo & rhs) const
|
||||
{
|
||||
return std::forward_as_tuple(partition_id, min_block, max_block, level, mutation)
|
||||
> std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation);
|
||||
}
|
||||
|
||||
|
||||
bool operator==(const MergeTreePartInfo & rhs) const
|
||||
{
|
||||
return !(*this != rhs);
|
||||
|
@ -35,7 +35,7 @@ struct PrewhereExprStep
|
||||
bool remove_filter_column = false;
|
||||
bool need_filter = false;
|
||||
|
||||
/// Some PREWHERE steps should be executed without conversions.
|
||||
/// Some PREWHERE steps should be executed without conversions (e.g. early mutation steps)
|
||||
/// A step without alter conversion cannot be executed after step with alter conversions.
|
||||
bool perform_alter_conversions = false;
|
||||
};
|
||||
|
@ -3,8 +3,8 @@
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
|
||||
#include <Storages/MergeTree/InsertBlockInfo.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Processors/Transforms/DeduplicationTokenTransforms.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/FailPoint.h>
|
||||
#include <Common/ProfileEventsScope.h>
|
||||
#include <Common/SipHash.h>
|
||||
|
@ -135,7 +135,6 @@ bool isRetryableException(std::exception_ptr exception_ptr)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static IMergeTreeDataPart::Checksums checkDataPart(
|
||||
MergeTreeData::DataPartPtr data_part,
|
||||
const IDataPartStorage & data_part_storage,
|
||||
@ -422,6 +421,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
|
||||
}
|
||||
|
||||
ReadSettings read_settings;
|
||||
read_settings.read_through_distributed_cache = false;
|
||||
read_settings.enable_filesystem_cache = false;
|
||||
read_settings.enable_filesystem_cache_log = false;
|
||||
read_settings.enable_filesystem_read_prefetches_log = false;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#if USE_AWS_S3
|
||||
#include <IO/S3Settings.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -150,6 +150,7 @@ size_t estimateValueSize(
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ColumnPtr fillColumnWithRandomData(
|
||||
const DataTypePtr type,
|
||||
@ -539,6 +540,8 @@ ColumnPtr fillColumnWithRandomData(
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
class GenerateSource : public ISource
|
||||
{
|
||||
|
@ -17,6 +17,8 @@ struct StorageID;
|
||||
class ASTCreateQuery;
|
||||
class Context;
|
||||
using ContextPtr = std::shared_ptr<const Context>;
|
||||
class IDatabase;
|
||||
using DatabasePtr = std::shared_ptr<IDatabase>;
|
||||
|
||||
/// Helper for replicated tables that use zookeeper for coordination among replicas.
|
||||
/// Handles things like:
|
||||
|
@ -1,138 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
|
||||
import json
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance("node1", with_zookeeper=True)
|
||||
node2 = cluster.add_instance("node2", with_zookeeper=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_random_string(length):
|
||||
return "".join(
|
||||
random.choice(string.ascii_uppercase + string.digits) for _ in range(length)
|
||||
)
|
||||
|
||||
|
||||
def test_system_replicated_fetches(started_cluster):
|
||||
node1.query(
|
||||
"CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple()"
|
||||
)
|
||||
node2.query(
|
||||
"CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple()"
|
||||
)
|
||||
|
||||
with PartitionManager() as pm:
|
||||
node2.query("SYSTEM STOP FETCHES t")
|
||||
node1.query(
|
||||
"INSERT INTO t SELECT number, '{}' FROM numbers(10000)".format(
|
||||
get_random_string(104857)
|
||||
)
|
||||
)
|
||||
pm.add_network_delay(node1, 80)
|
||||
node2.query("SYSTEM START FETCHES t")
|
||||
fetches_result = []
|
||||
for _ in range(1000):
|
||||
result = json.loads(
|
||||
node2.query("SELECT * FROM system.replicated_fetches FORMAT JSON")
|
||||
)
|
||||
if not result["data"]:
|
||||
if fetches_result:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
fetches_result.append(result["data"][0])
|
||||
print(fetches_result[-1])
|
||||
time.sleep(0.1)
|
||||
|
||||
node2.query("SYSTEM SYNC REPLICA t", timeout=10)
|
||||
assert node2.query("SELECT COUNT() FROM t") == "10000\n"
|
||||
|
||||
for elem in fetches_result:
|
||||
elem["bytes_read_compressed"] = float(elem["bytes_read_compressed"])
|
||||
elem["total_size_bytes_compressed"] = float(elem["total_size_bytes_compressed"])
|
||||
elem["progress"] = float(elem["progress"])
|
||||
elem["elapsed"] = float(elem["elapsed"])
|
||||
|
||||
assert len(fetches_result) > 0
|
||||
first_non_empty = fetches_result[0]
|
||||
|
||||
assert first_non_empty["database"] == "default"
|
||||
assert first_non_empty["table"] == "t"
|
||||
assert first_non_empty["source_replica_hostname"] == "node1"
|
||||
assert first_non_empty["source_replica_port"] == 9009
|
||||
assert first_non_empty["source_replica_path"] == "/clickhouse/test/t/replicas/1"
|
||||
assert first_non_empty["interserver_scheme"] == "http"
|
||||
assert first_non_empty["result_part_name"] == "all_0_0_0"
|
||||
assert first_non_empty["result_part_path"].startswith("/var/lib/clickhouse/")
|
||||
assert first_non_empty["result_part_path"].endswith("all_0_0_0/")
|
||||
assert first_non_empty["partition_id"] == "all"
|
||||
assert first_non_empty["URI"].startswith(
|
||||
"http://node1:9009/?endpoint=DataPartsExchange"
|
||||
)
|
||||
|
||||
for elem in fetches_result:
|
||||
# FIXME https://github.com/ClickHouse/ClickHouse/issues/45435
|
||||
# assert (
|
||||
# elem["bytes_read_compressed"] <= elem["total_size_bytes_compressed"]
|
||||
# ), "Bytes read ({}) more than total bytes ({}). It's a bug".format(
|
||||
# elem["bytes_read_compressed"], elem["total_size_bytes_compressed"]
|
||||
# )
|
||||
# assert (
|
||||
# 0.0 <= elem["progress"] <= 1.0
|
||||
# ), "Progress shouldn't less than 0 and bigger than 1, got {}".format(
|
||||
# elem["progress"]
|
||||
# )
|
||||
assert (
|
||||
0.0 <= elem["elapsed"]
|
||||
), "Elapsed time must be greater than 0, got {}".format(elem["elapsed"])
|
||||
|
||||
prev_progress = first_non_empty["progress"]
|
||||
for elem in fetches_result:
|
||||
assert (
|
||||
elem["progress"] >= prev_progress
|
||||
), "Progress decreasing prev{}, next {}? It's a bug".format(
|
||||
prev_progress, elem["progress"]
|
||||
)
|
||||
prev_progress = elem["progress"]
|
||||
|
||||
prev_bytes = first_non_empty["bytes_read_compressed"]
|
||||
for elem in fetches_result:
|
||||
assert (
|
||||
elem["bytes_read_compressed"] >= prev_bytes
|
||||
), "Bytes read decreasing prev {}, next {}? It's a bug".format(
|
||||
prev_bytes, elem["bytes_read_compressed"]
|
||||
)
|
||||
prev_bytes = elem["bytes_read_compressed"]
|
||||
|
||||
prev_elapsed = first_non_empty["elapsed"]
|
||||
for elem in fetches_result:
|
||||
assert (
|
||||
elem["elapsed"] >= prev_elapsed
|
||||
), "Elapsed time decreasing prev {}, next {}? It's a bug".format(
|
||||
prev_elapsed, elem["elapsed"]
|
||||
)
|
||||
prev_elapsed = elem["elapsed"]
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS t SYNC")
|
||||
node2.query("DROP TABLE IF EXISTS t SYNC")
|
@ -142,6 +142,7 @@ SYSTEM REPLICATED SENDS ['SYSTEM STOP REPLICATED SENDS','SYSTEM START REPLICATED
|
||||
SYSTEM SENDS ['SYSTEM STOP SENDS','SYSTEM START SENDS','STOP SENDS','START SENDS'] \N SYSTEM
|
||||
SYSTEM REPLICATION QUEUES ['SYSTEM STOP REPLICATION QUEUES','SYSTEM START REPLICATION QUEUES','STOP REPLICATION QUEUES','START REPLICATION QUEUES'] TABLE SYSTEM
|
||||
SYSTEM VIRTUAL PARTS UPDATE ['SYSTEM STOP VIRTUAL PARTS UPDATE','SYSTEM START VIRTUAL PARTS UPDATE','STOP VIRTUAL PARTS UPDATE','START VIRTUAL PARTS UPDATE'] TABLE SYSTEM
|
||||
SYSTEM REDUCE BLOCKING PARTS ['SYSTEM STOP REDUCE BLOCKING PARTS','SYSTEM START REDUCE BLOCKING PARTS','STOP REDUCE BLOCKING PARTS','START REDUCE BLOCKING PARTS'] TABLE SYSTEM
|
||||
SYSTEM DROP REPLICA ['DROP REPLICA'] TABLE SYSTEM
|
||||
SYSTEM SYNC REPLICA ['SYNC REPLICA'] TABLE SYSTEM
|
||||
SYSTEM REPLICA READINESS ['SYSTEM REPLICA READY','SYSTEM REPLICA UNREADY'] GLOBAL SYSTEM
|
||||
|
@ -6,4 +6,3 @@
|
||||
2020-01-01 00:00:00 2
|
||||
1
|
||||
499999
|
||||
18
|
||||
|
@ -1,5 +1,4 @@
|
||||
-- Tags: no-parallel, no-fasttest
|
||||
-- no-parallel: it checks the number of threads, which can be lowered in presence of other queries
|
||||
-- Tags: no-fasttest
|
||||
|
||||
DROP TABLE IF EXISTS select_final;
|
||||
|
||||
@ -32,17 +31,7 @@ SELECT max(x) FROM select_final FINAL where string = 'updated';
|
||||
TRUNCATE TABLE select_final;
|
||||
|
||||
INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(500000);
|
||||
|
||||
OPTIMIZE TABLE select_final FINAL;
|
||||
|
||||
SET remote_filesystem_read_method = 'read';
|
||||
SET local_filesystem_read_method = 'pread';
|
||||
set load_marks_asynchronously = 0;
|
||||
|
||||
SELECT max(x) FROM select_final FINAL;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SELECT length(thread_ids) FROM system.query_log WHERE query='SELECT max(x) FROM select_final FINAL;' AND type='QueryFinish' AND current_database = currentDatabase() ORDER BY event_time DESC LIMIT 1;
|
||||
|
||||
DROP TABLE select_final;
|
||||
|
Loading…
Reference in New Issue
Block a user