mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge remote-tracking branch 'blessed/master' into backup_1
This commit is contained in:
commit
546484d46b
@ -53,7 +53,6 @@ clickhouse-benchmark [keys] < queries_file;
|
||||
- `--confidence=N` — Level of confidence for T-test. Possible values: 0 (80%), 1 (90%), 2 (95%), 3 (98%), 4 (99%), 5 (99.5%). Default value: 5. In the [comparison mode](#clickhouse-benchmark-comparison-mode) `clickhouse-benchmark` performs the [Independent two-sample Student’s t-test](https://en.wikipedia.org/wiki/Student%27s_t-test#Independent_two-sample_t-test) to determine whether the two distributions aren’t different with the selected level of confidence.
|
||||
- `--cumulative` — Printing cumulative data instead of data per interval.
|
||||
- `--database=DATABASE_NAME` — ClickHouse database name. Default value: `default`.
|
||||
- `--json=FILEPATH` — `JSON` output. When the key is set, `clickhouse-benchmark` outputs a report to the specified JSON-file.
|
||||
- `--user=USERNAME` — ClickHouse user name. Default value: `default`.
|
||||
- `--password=PSWD` — ClickHouse user password. Default value: empty string.
|
||||
- `--stacktrace` — Stack traces output. When the key is set, `clickhouse-bencmark` outputs stack traces of exceptions.
|
||||
|
@ -183,6 +183,7 @@ enum class AccessType
|
||||
M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP REPLICATION QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \
|
||||
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
|
||||
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
|
||||
M(SYSTEM_REPLICA_READINESS, "SYSTEM REPLICA READY, SYSTEM REPLICA UNREADY", GLOBAL, SYSTEM) \
|
||||
M(SYSTEM_RESTART_REPLICA, "RESTART REPLICA", TABLE, SYSTEM) \
|
||||
M(SYSTEM_RESTORE_REPLICA, "RESTORE REPLICA", TABLE, SYSTEM) \
|
||||
M(SYSTEM_WAIT_LOADING_PARTS, "WAIT LOADING PARTS", TABLE, SYSTEM) \
|
||||
|
@ -519,8 +519,9 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
|
||||
if (attr_nodes["from_zk"]) /// we have zookeeper subst
|
||||
{
|
||||
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
|
||||
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_zk substitution");
|
||||
/// only allow substitution for nodes with no value and without "replace"
|
||||
if (node->hasChildNodes() && !replace)
|
||||
throw Poco::Exception("Element <" + node->nodeName() + "> has value and does not have 'replace' attribute, can't process from_zk substitution");
|
||||
|
||||
contributing_zk_paths.insert(attr_nodes["from_zk"]->getNodeValue());
|
||||
|
||||
@ -544,8 +545,9 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
|
||||
if (attr_nodes["from_env"]) /// we have env subst
|
||||
{
|
||||
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
|
||||
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_env substitution");
|
||||
/// only allow substitution for nodes with no value and without "replace"
|
||||
if (node->hasChildNodes() && !replace)
|
||||
throw Poco::Exception("Element <" + node->nodeName() + "> has value and does not have 'replace' attribute, can't process from_env substitution");
|
||||
|
||||
XMLDocumentPtr env_document;
|
||||
auto get_env_node = [&](const std::string & name) -> const Node *
|
||||
|
@ -260,6 +260,7 @@
|
||||
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M)
|
||||
#endif
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
#define M(NAME, DOCUMENTATION) extern const Metric NAME = Metric(__COUNTER__);
|
||||
|
@ -99,6 +99,7 @@ struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
|
||||
struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest
|
||||
{
|
||||
TestKeeperGetRequest() = default;
|
||||
explicit TestKeeperGetRequest(const GetRequest & base) : GetRequest(base) {}
|
||||
ResponsePtr createResponse() const override;
|
||||
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||
};
|
||||
@ -118,6 +119,8 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
|
||||
|
||||
struct TestKeeperListRequest : ListRequest, TestKeeperRequest
|
||||
{
|
||||
TestKeeperListRequest() = default;
|
||||
explicit TestKeeperListRequest(const ListRequest & base) : ListRequest(base) {}
|
||||
ResponsePtr createResponse() const override;
|
||||
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||
};
|
||||
@ -176,6 +179,14 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
|
||||
}
|
||||
else if (const auto * concrete_request_get = dynamic_cast<const GetRequest *>(generic_request.get()))
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperGetRequest>(*concrete_request_get));
|
||||
}
|
||||
else if (const auto * concrete_request_list = dynamic_cast<const ListRequest *>(generic_request.get()))
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperListRequest>(*concrete_request_list));
|
||||
}
|
||||
else
|
||||
throw Exception::fromMessage(Error::ZBADARGUMENTS, "Illegal command as part of multi ZooKeeper request");
|
||||
}
|
||||
|
@ -497,6 +497,17 @@ bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, cons
|
||||
return existsWatch(path, stat, callbackForEvent(watch));
|
||||
}
|
||||
|
||||
bool ZooKeeper::anyExists(const std::vector<std::string> & paths)
|
||||
{
|
||||
auto exists_multi_response = exists(paths);
|
||||
for (size_t i = 0; i < exists_multi_response.size(); ++i)
|
||||
{
|
||||
if (exists_multi_response[i].error == Coordination::Error::ZOK)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
|
||||
{
|
||||
Coordination::Error code = existsImpl(path, stat, watch_callback);
|
||||
|
@ -290,6 +290,8 @@ public:
|
||||
return exists(paths.begin(), paths.end());
|
||||
}
|
||||
|
||||
bool anyExists(const std::vector<std::string> & paths);
|
||||
|
||||
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
|
||||
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
|
||||
|
||||
@ -426,8 +428,9 @@ public:
|
||||
/// Performs several operations in a transaction.
|
||||
/// Throws on every error.
|
||||
Coordination::Responses multi(const Coordination::Requests & requests);
|
||||
/// Throws only if some operation has returned an "unexpected" error
|
||||
/// - an error that would cause the corresponding try- method to throw.
|
||||
/// Throws only if some operation has returned an "unexpected" error - an error that would cause
|
||||
/// the corresponding try- method to throw.
|
||||
/// On exception, `responses` may or may not be populated.
|
||||
Coordination::Error tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses);
|
||||
/// Throws nothing (even session expired errors)
|
||||
Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses);
|
||||
@ -571,8 +574,11 @@ public:
|
||||
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
|
||||
|
||||
UInt32 getSessionUptime() const { return static_cast<UInt32>(session_uptime.elapsedSeconds()); }
|
||||
|
||||
bool hasReachedDeadline() const { return impl->hasReachedDeadline(); }
|
||||
|
||||
uint64_t getSessionTimeoutMS() const { return args.session_timeout_ms; }
|
||||
|
||||
void setServerCompletelyStarted();
|
||||
|
||||
Int8 getConnectedHostIdx() const;
|
||||
|
@ -208,6 +208,9 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
|
||||
return;
|
||||
}
|
||||
|
||||
/// To avoid reference to binding
|
||||
const auto & snapshot_path_ref = snapshot_path;
|
||||
|
||||
SCOPE_EXIT(
|
||||
{
|
||||
LOG_INFO(log, "Removing lock file");
|
||||
@ -223,7 +226,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_file_info.path);
|
||||
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path_ref);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
});
|
||||
|
@ -35,6 +35,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
struct ContextSharedPart : boost::noncopyable
|
||||
@ -376,4 +377,9 @@ void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::Abstr
|
||||
shared->keeper_dispatcher->updateConfiguration(getConfigRef(), getMacros());
|
||||
}
|
||||
|
||||
std::shared_ptr<zkutil::ZooKeeper> Context::getZooKeeper() const
|
||||
{
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot connect to ZooKeeper from Keeper");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,6 +21,11 @@
|
||||
#include <memory>
|
||||
|
||||
#include "config.h"
|
||||
namespace zkutil
|
||||
{
|
||||
class ZooKeeper;
|
||||
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -153,6 +158,8 @@ public:
|
||||
void initializeKeeperDispatcher(bool start_async) const;
|
||||
void shutdownKeeperDispatcher() const;
|
||||
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -133,6 +133,8 @@ enum class DefaultTableEngine
|
||||
ReplacingMergeTree,
|
||||
ReplicatedMergeTree,
|
||||
ReplicatedReplacingMergeTree,
|
||||
SharedMergeTree,
|
||||
SharedReplacingMergeTree,
|
||||
Memory,
|
||||
};
|
||||
|
||||
|
@ -69,12 +69,6 @@ DictionaryPtr DictionaryFactory::create(
|
||||
layout_type);
|
||||
}
|
||||
|
||||
DictionaryPtr DictionaryFactory::create(const std::string & name, const ASTCreateQuery & ast, ContextPtr global_context) const
|
||||
{
|
||||
auto configuration = getDictionaryConfigurationFromAST(ast, global_context);
|
||||
return DictionaryFactory::create(name, *configuration, "dictionary", global_context, true);
|
||||
}
|
||||
|
||||
bool DictionaryFactory::isComplex(const std::string & layout_type) const
|
||||
{
|
||||
auto it = registered_layouts.find(layout_type);
|
||||
|
@ -39,11 +39,6 @@ public:
|
||||
ContextPtr global_context,
|
||||
bool created_from_ddl) const;
|
||||
|
||||
/// Create dictionary from DDL-query
|
||||
DictionaryPtr create(const std::string & name,
|
||||
const ASTCreateQuery & ast,
|
||||
ContextPtr global_context) const;
|
||||
|
||||
using LayoutCreateFunction = std::function<DictionaryPtr(
|
||||
const std::string & name,
|
||||
const DictionaryStructure & dict_struct,
|
||||
|
@ -10,6 +10,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// It's a bug in clang with three-way comparison operator
|
||||
/// https://github.com/llvm/llvm-project/issues/55919
|
||||
#ifdef __clang__
|
||||
#pragma clang diagnostic push
|
||||
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
|
||||
#endif
|
||||
|
||||
/** Mark is the position in the compressed file. The compressed file consists of adjacent compressed blocks.
|
||||
* Mark is a tuple - the offset in the file to the start of the compressed block, the offset in the decompressed block to the start of the data.
|
||||
*/
|
||||
@ -18,12 +25,7 @@ struct MarkInCompressedFile
|
||||
size_t offset_in_compressed_file;
|
||||
size_t offset_in_decompressed_block;
|
||||
|
||||
bool operator==(const MarkInCompressedFile & rhs) const
|
||||
{
|
||||
return std::tie(offset_in_compressed_file, offset_in_decompressed_block)
|
||||
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block);
|
||||
}
|
||||
bool operator!=(const MarkInCompressedFile & rhs) const { return !(*this == rhs); }
|
||||
auto operator<=>(const MarkInCompressedFile &) const = default;
|
||||
|
||||
auto asTuple() const { return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block); }
|
||||
|
||||
@ -39,6 +41,10 @@ struct MarkInCompressedFile
|
||||
}
|
||||
};
|
||||
|
||||
#ifdef __clang__
|
||||
#pragma clang diagnostic pop
|
||||
#endif
|
||||
|
||||
/**
|
||||
* In-memory representation of an array of marks.
|
||||
*
|
||||
|
@ -515,7 +515,9 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
|
||||
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
|
||||
constexpr ResourceCost estimated_cost = 1;
|
||||
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
|
||||
|
||||
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
|
||||
|
||||
rlock.unlock();
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
|
@ -13,9 +13,9 @@
|
||||
namespace DB::S3
|
||||
{
|
||||
std::shared_ptr<Aws::Http::HttpClient>
|
||||
PocoHTTPClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const
|
||||
PocoHTTPClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & client_configuration) const
|
||||
{
|
||||
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(clientConfiguration));
|
||||
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(client_configuration));
|
||||
}
|
||||
|
||||
std::shared_ptr<Aws::Http::HttpRequest> PocoHTTPClientFactory::CreateHttpRequest(
|
||||
|
@ -15,7 +15,7 @@ class PocoHTTPClientFactory : public Aws::Http::HttpClientFactory
|
||||
public:
|
||||
~PocoHTTPClientFactory() override = default;
|
||||
[[nodiscard]] std::shared_ptr<Aws::Http::HttpClient>
|
||||
CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override;
|
||||
CreateHttpClient(const Aws::Client::ClientConfiguration & client_configuration) const override;
|
||||
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
|
||||
CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override;
|
||||
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
|
||||
|
@ -655,6 +655,7 @@ namespace
|
||||
|
||||
void performCopy()
|
||||
{
|
||||
LOG_TEST(log, "Copy object {} to {} using native copy", src_key, dest_key);
|
||||
if (!supports_multipart_copy || size <= upload_settings.max_single_operation_copy_size)
|
||||
performSingleOperationCopy();
|
||||
else
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <Common/Throttler_fwd.h>
|
||||
|
||||
#include <IO/S3/URI.h>
|
||||
#include <IO/S3/Credentials.h>
|
||||
|
||||
#include <aws/core/Aws.h>
|
||||
#include <aws/s3/S3Errors.h>
|
||||
|
@ -45,6 +45,7 @@
|
||||
#include <Access/Common/AllowedClientHosts.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Databases/DatabaseReplicated.h>
|
||||
#include <Disks/ObjectStorages/IMetadataStorage.h>
|
||||
#include <Storages/StorageDistributed.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/Freeze.h>
|
||||
@ -92,6 +93,7 @@ namespace ErrorCodes
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
extern const int TABLE_WAS_NOT_DROPPED;
|
||||
extern const int ABORTED;
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
|
||||
@ -442,6 +444,10 @@ BlockIO InterpreterSystemQuery::execute()
|
||||
result.pipeline = QueryPipeline(std::move(source));
|
||||
break;
|
||||
}
|
||||
case Type::DROP_DISK_METADATA_CACHE:
|
||||
{
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
|
||||
}
|
||||
case Type::DROP_SCHEMA_CACHE:
|
||||
{
|
||||
getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE);
|
||||
@ -611,6 +617,10 @@ BlockIO InterpreterSystemQuery::execute()
|
||||
case Type::SYNC_DATABASE_REPLICA:
|
||||
syncReplicatedDatabase(query);
|
||||
break;
|
||||
case Type::REPLICA_UNREADY:
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
|
||||
case Type::REPLICA_READY:
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
|
||||
case Type::SYNC_TRANSACTION_LOG:
|
||||
syncTransactionLog();
|
||||
break;
|
||||
@ -1119,6 +1129,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
|
||||
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
|
||||
break;
|
||||
}
|
||||
case Type::DROP_DISK_METADATA_CACHE:
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
|
||||
case Type::RELOAD_DICTIONARY:
|
||||
case Type::RELOAD_DICTIONARIES:
|
||||
case Type::RELOAD_EMBEDDED_DICTIONARIES:
|
||||
@ -1245,6 +1257,9 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
|
||||
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.getDatabase(), query.getTable());
|
||||
break;
|
||||
}
|
||||
case Type::REPLICA_READY:
|
||||
case Type::REPLICA_UNREADY:
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
|
||||
case Type::RESTART_REPLICA:
|
||||
{
|
||||
required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA, query.getDatabase(), query.getTable());
|
||||
|
@ -57,6 +57,7 @@ private:
|
||||
void restartReplica(const StorageID & replica, ContextMutablePtr system_context);
|
||||
void restartReplicas(ContextMutablePtr system_context);
|
||||
void syncReplica(ASTSystemQuery & query);
|
||||
void setReplicaReadiness(bool ready);
|
||||
void waitLoadingParts();
|
||||
|
||||
void syncReplicatedDatabase(ASTSystemQuery & query);
|
||||
|
@ -244,6 +244,31 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
#ifdef PRINT_ASSEMBLY
|
||||
|
||||
class AssemblyPrinter
|
||||
{
|
||||
public:
|
||||
explicit AssemblyPrinter(llvm::TargetMachine &target_machine_)
|
||||
: target_machine(target_machine_)
|
||||
{
|
||||
}
|
||||
|
||||
void print(llvm::Module & module)
|
||||
{
|
||||
llvm::legacy::PassManager pass_manager;
|
||||
target_machine.Options.MCOptions.AsmVerbose = true;
|
||||
if (target_machine.addPassesToEmitFile(pass_manager, llvm::errs(), nullptr, llvm::CodeGenFileType::CGFT_AssemblyFile))
|
||||
throw Exception(ErrorCodes::CANNOT_COMPILE_CODE, "MachineCode cannot be printed");
|
||||
|
||||
pass_manager.run(module);
|
||||
}
|
||||
private:
|
||||
llvm::TargetMachine & target_machine;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
/** MemoryManager for module.
|
||||
* Keep total allocated size during RuntimeDyld linker execution.
|
||||
*/
|
||||
@ -375,6 +400,11 @@ CHJIT::CompiledModule CHJIT::compileModule(std::unique_ptr<llvm::Module> module)
|
||||
{
|
||||
runOptimizationPassesOnModule(*module);
|
||||
|
||||
#ifdef PRINT_ASSEMBLY
|
||||
AssemblyPrinter assembly_printer(*machine);
|
||||
assembly_printer.print(*module);
|
||||
#endif
|
||||
|
||||
auto buffer = compiler->compile(*module);
|
||||
|
||||
llvm::Expected<std::unique_ptr<llvm::object::ObjectFile>> object = llvm::object::ObjectFile::createObjectFile(*buffer);
|
||||
|
@ -179,7 +179,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|
||||
|| type == Type::RELOAD_DICTIONARY
|
||||
|| type == Type::RELOAD_MODEL
|
||||
|| type == Type::RELOAD_FUNCTION
|
||||
|| type == Type::RESTART_DISK)
|
||||
|| type == Type::RESTART_DISK
|
||||
|| type == Type::DROP_DISK_METADATA_CACHE)
|
||||
{
|
||||
if (table)
|
||||
{
|
||||
|
@ -32,6 +32,7 @@ public:
|
||||
DROP_COMPILED_EXPRESSION_CACHE,
|
||||
#endif
|
||||
DROP_FILESYSTEM_CACHE,
|
||||
DROP_DISK_METADATA_CACHE,
|
||||
DROP_SCHEMA_CACHE,
|
||||
DROP_FORMAT_SCHEMA_CACHE,
|
||||
#if USE_AWS_S3
|
||||
@ -49,6 +50,8 @@ public:
|
||||
SYNC_DATABASE_REPLICA,
|
||||
SYNC_TRANSACTION_LOG,
|
||||
SYNC_FILE_CACHE,
|
||||
REPLICA_READY,
|
||||
REPLICA_UNREADY,
|
||||
RELOAD_DICTIONARY,
|
||||
RELOAD_DICTIONARIES,
|
||||
RELOAD_MODEL,
|
||||
|
@ -12,6 +12,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
[[nodiscard]] static bool parseQueryWithOnClusterAndMaybeTable(std::shared_ptr<ASTSystemQuery> & res, IParser::Pos & pos,
|
||||
Expected & expected, bool require_table, bool allow_string_literal)
|
||||
{
|
||||
@ -427,6 +432,10 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
case Type::DROP_DISK_METADATA_CACHE:
|
||||
{
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
|
||||
}
|
||||
case Type::DROP_SCHEMA_CACHE:
|
||||
{
|
||||
if (ParserKeyword{"FOR"}.ignore(pos, expected))
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/OptimizedRegularExpression.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
@ -122,6 +123,17 @@ struct Pattern
|
||||
AggregateFunctionPtr function;
|
||||
Retentions retentions; /// Must be ordered by 'age' descending.
|
||||
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
|
||||
void updateHash(SipHash & hash) const
|
||||
{
|
||||
hash.update(rule_type);
|
||||
hash.update(regexp_str);
|
||||
hash.update(function->getName());
|
||||
for (const auto & r : retentions)
|
||||
{
|
||||
hash.update(r.age);
|
||||
hash.update(r.precision);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
bool operator==(const Pattern & a, const Pattern & b);
|
||||
@ -142,6 +154,21 @@ struct Params
|
||||
Graphite::Patterns patterns;
|
||||
Graphite::Patterns patterns_plain;
|
||||
Graphite::Patterns patterns_tagged;
|
||||
void updateHash(SipHash & hash) const
|
||||
{
|
||||
hash.update(path_column_name);
|
||||
hash.update(time_column_name);
|
||||
hash.update(value_column_name);
|
||||
hash.update(value_column_name);
|
||||
hash.update(version_column_name);
|
||||
hash.update(patterns_typed);
|
||||
for (const auto & p : patterns)
|
||||
p.updateHash(hash);
|
||||
for (const auto & p : patterns_plain)
|
||||
p.updateHash(hash);
|
||||
for (const auto & p : patterns_tagged)
|
||||
p.updateHash(hash);
|
||||
}
|
||||
};
|
||||
|
||||
using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;
|
||||
|
@ -1061,8 +1061,13 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
response.setChunkedTransferEncoding(true);
|
||||
|
||||
HTMLForm params(default_settings, request);
|
||||
with_stacktrace = params.getParsed<bool>("stacktrace", false);
|
||||
close_session = params.getParsed<bool>("close_session", false);
|
||||
|
||||
if (params.getParsed<bool>("stacktrace", false) && server.config().getBool("enable_http_stacktrace", true))
|
||||
with_stacktrace = true;
|
||||
|
||||
if (params.getParsed<bool>("close_session", false) && server.config().getBool("enable_http_close_session", true))
|
||||
close_session = true;
|
||||
|
||||
if (close_session)
|
||||
session_id = params.get("session_id");
|
||||
|
||||
|
@ -28,11 +28,17 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
{
|
||||
HTMLForm params(getContext()->getSettingsRef(), request);
|
||||
|
||||
/// Even if lag is small, output detailed information about the lag.
|
||||
bool verbose = params.get("verbose", "") == "1";
|
||||
const auto & config = getContext()->getConfigRef();
|
||||
|
||||
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
|
||||
|
||||
/// Even if lag is small, output detailed information about the lag.
|
||||
bool verbose = false;
|
||||
bool enable_verbose = config.getBool("enable_verbose_replicas_status", true);
|
||||
|
||||
if (params.get("verbose", "") == "1" && enable_verbose)
|
||||
verbose = true;
|
||||
|
||||
bool ok = true;
|
||||
WriteBufferFromOwnString message;
|
||||
|
||||
@ -78,13 +84,13 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
}
|
||||
|
||||
const auto & config = getContext()->getConfigRef();
|
||||
setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT));
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
|
||||
verbose = true;
|
||||
if (enable_verbose)
|
||||
verbose = true;
|
||||
}
|
||||
|
||||
if (verbose)
|
||||
|
@ -144,6 +144,9 @@ bool ServerType::shouldStop(const std::string & port_name) const
|
||||
port_custom_name = port_name.substr(protocols_size, port_name.size() - protocols_size - ports_size + 1);
|
||||
}
|
||||
|
||||
else if (port_name == "cloud.port")
|
||||
port_type = Type::CLOUD;
|
||||
|
||||
else
|
||||
return false;
|
||||
|
||||
|
@ -26,6 +26,7 @@ public:
|
||||
QUERIES_ALL,
|
||||
QUERIES_DEFAULT,
|
||||
QUERIES_CUSTOM,
|
||||
CLOUD,
|
||||
END
|
||||
};
|
||||
|
||||
|
@ -161,26 +161,44 @@ void DefaultCoordinator::updateReadingState(InitialAllRangesAnnouncement announc
|
||||
PartRefs parts_diff;
|
||||
|
||||
/// To get rid of duplicates
|
||||
for (auto && part: announcement.description)
|
||||
for (auto && part_ranges: announcement.description)
|
||||
{
|
||||
auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(),
|
||||
[&part] (const Part & other) { return other.description.info.getPartNameV1() == part.info.getPartNameV1(); });
|
||||
Part part{.description = std::move(part_ranges), .replicas = {announcement.replica_num}};
|
||||
const MergeTreePartInfo & announced_part = part.description.info;
|
||||
|
||||
/// We have the same part - add the info about presence on current replica to it
|
||||
if (the_same_it != all_parts_to_read.end())
|
||||
auto it = std::lower_bound(cbegin(all_parts_to_read), cend(all_parts_to_read), part);
|
||||
if (it != all_parts_to_read.cend())
|
||||
{
|
||||
the_same_it->replicas.insert(announcement.replica_num);
|
||||
continue;
|
||||
const MergeTreePartInfo & found_part = it->description.info;
|
||||
if (found_part == announced_part)
|
||||
{
|
||||
/// We have the same part - add the info about presence on current replica
|
||||
it->replicas.insert(announcement.replica_num);
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// check if it is covering or covered part
|
||||
/// need to compare with 2 nearest parts in set, - lesser and greater than the part from the announcement
|
||||
bool is_disjoint = found_part.isDisjoint(announced_part);
|
||||
if (it != all_parts_to_read.cbegin() && is_disjoint)
|
||||
{
|
||||
const MergeTreePartInfo & lesser_part = (--it)->description.info;
|
||||
is_disjoint &= lesser_part.isDisjoint(announced_part);
|
||||
}
|
||||
if (!is_disjoint)
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else if (!all_parts_to_read.empty())
|
||||
{
|
||||
/// the announced part is greatest - check if it's disjoint with lesser part
|
||||
const MergeTreePartInfo & lesser_part = all_parts_to_read.crbegin()->description.info;
|
||||
if (!lesser_part.isDisjoint(announced_part))
|
||||
continue;
|
||||
}
|
||||
|
||||
auto covering_or_the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(),
|
||||
[&part] (const Part & other) { return !other.description.info.isDisjoint(part.info); });
|
||||
|
||||
/// It is covering part or we have covering - skip it
|
||||
if (covering_or_the_same_it != all_parts_to_read.end())
|
||||
continue;
|
||||
|
||||
auto [insert_it, _] = all_parts_to_read.emplace(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
|
||||
auto [insert_it, _] = all_parts_to_read.emplace(std::move(part));
|
||||
parts_diff.push_back(insert_it);
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,9 @@
|
||||
#include <Parsers/ExpressionListParsers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Interpreters/FunctionNameNormalizer.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -49,6 +52,17 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
index_granularity = data_settings->index_granularity;
|
||||
merging_params_mode = static_cast<int>(data.merging_params.mode);
|
||||
sign_column = data.merging_params.sign_column;
|
||||
is_deleted_column = data.merging_params.is_deleted_column;
|
||||
columns_to_sum = fmt::format("{}", fmt::join(data.merging_params.columns_to_sum.begin(), data.merging_params.columns_to_sum.end(), ","));
|
||||
version_column = data.merging_params.version_column;
|
||||
if (data.merging_params.mode == MergeTreeData::MergingParams::Graphite)
|
||||
{
|
||||
SipHash graphite_hash;
|
||||
data.merging_params.graphite_params.updateHash(graphite_hash);
|
||||
WriteBufferFromOwnString wb;
|
||||
writeText(graphite_hash.get128(), wb);
|
||||
graphite_params_hash = std::move(wb.str());
|
||||
}
|
||||
|
||||
/// This code may looks strange, but previously we had only one entity: PRIMARY KEY (or ORDER BY, it doesn't matter)
|
||||
/// Now we have two different entities ORDER BY and it's optional prefix -- PRIMARY KEY.
|
||||
@ -90,6 +104,22 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
|
||||
void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
|
||||
{
|
||||
/// Important notes: new added field must always be append to the end of serialized metadata
|
||||
/// for backward compatible.
|
||||
|
||||
/// In addition, two consecutive fields should not share any prefix, otherwise deserialize may fails.
|
||||
/// For example, if you have two field `v1` and `v2` serialized as:
|
||||
/// if (!v1.empty()) out << "v1: " << v1 << "\n";
|
||||
/// if (!v2.empty()) out << "v2: " << v2 << "\n";
|
||||
/// Let say if `v1` is empty and v2 is non-empty, then `v1` is not in serialized metadata.
|
||||
/// Later, to deserialize the metadata, `read` will sequentially check if each field with `checkString`.
|
||||
/// When it begin to check for `v1` and `v2`, the metadata buffer look like this:
|
||||
/// v2: <v2 value>
|
||||
/// ^
|
||||
/// cursor
|
||||
/// `checkString("v1: ", in)` will be called first and it moves the cursor to `2` instead of `v`, so the
|
||||
/// subsequent call `checkString("v2: ", in)` will also fails.
|
||||
|
||||
out << "metadata format version: 1\n"
|
||||
<< "date column: " << date_column << "\n"
|
||||
<< "sampling expression: " << sampling_expression << "\n"
|
||||
@ -121,6 +151,19 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
|
||||
|
||||
if (!constraints.empty())
|
||||
out << "constraints: " << constraints << "\n";
|
||||
|
||||
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
|
||||
{
|
||||
out << "merge parameters format version: " << merge_params_version << "\n";
|
||||
if (!version_column.empty())
|
||||
out << "version column: " << version_column << "\n";
|
||||
if (!is_deleted_column.empty())
|
||||
out << "is_deleted column: " << is_deleted_column << "\n";
|
||||
if (!columns_to_sum.empty())
|
||||
out << "columns to sum: " << columns_to_sum << "\n";
|
||||
if (!graphite_params_hash.empty())
|
||||
out << "graphite hash: " << graphite_params_hash << "\n";
|
||||
}
|
||||
}
|
||||
|
||||
String ReplicatedMergeTreeTableMetadata::toString() const
|
||||
@ -170,6 +213,26 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
|
||||
|
||||
if (checkString("constraints: ", in))
|
||||
in >> constraints >> "\n";
|
||||
|
||||
if (checkString("merge parameters format version: ", in))
|
||||
in >> merge_params_version >> "\n";
|
||||
else
|
||||
merge_params_version = REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION;
|
||||
|
||||
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
|
||||
{
|
||||
if (checkString("version column: ", in))
|
||||
in >> version_column >> "\n";
|
||||
|
||||
if (checkString("is_deleted column: ", in))
|
||||
in >> is_deleted_column >> "\n";
|
||||
|
||||
if (checkString("columns to sum: ", in))
|
||||
in >> columns_to_sum >> "\n";
|
||||
|
||||
if (checkString("graphite hash: ", in))
|
||||
in >> graphite_params_hash >> "\n";
|
||||
}
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeTableMetadata ReplicatedMergeTreeTableMetadata::parse(const String & s)
|
||||
@ -210,6 +273,25 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
|
||||
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sign column. "
|
||||
"Stored in ZooKeeper: {}, local: {}", from_zk.sign_column, sign_column);
|
||||
|
||||
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS && from_zk.merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
|
||||
{
|
||||
if (version_column != from_zk.version_column)
|
||||
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in version column. "
|
||||
"Stored in ZooKeeper: {}, local: {}", from_zk.version_column, version_column);
|
||||
|
||||
if (is_deleted_column != from_zk.is_deleted_column)
|
||||
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in is_deleted column. "
|
||||
"Stored in ZooKeeper: {}, local: {}", from_zk.is_deleted_column, is_deleted_column);
|
||||
|
||||
if (columns_to_sum != from_zk.columns_to_sum)
|
||||
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sum columns. "
|
||||
"Stored in ZooKeeper: {}, local: {}", from_zk.columns_to_sum, columns_to_sum);
|
||||
|
||||
if (graphite_params_hash != from_zk.graphite_params_hash)
|
||||
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in graphite params. "
|
||||
"Stored in ZooKeeper hash: {}, local hash: {}", from_zk.graphite_params_hash, graphite_params_hash);
|
||||
}
|
||||
|
||||
/// NOTE: You can make a less strict check of match expressions so that tables do not break from small changes
|
||||
/// in formatAST code.
|
||||
String parsed_zk_primary_key = formattedAST(KeyDescription::parse(from_zk.primary_key, columns, context).expression_list_ast);
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
|
||||
#include <base/types.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -17,11 +18,20 @@ class ReadBuffer;
|
||||
*/
|
||||
struct ReplicatedMergeTreeTableMetadata
|
||||
{
|
||||
static constexpr int REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION = 1;
|
||||
static constexpr int REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS = 2;
|
||||
|
||||
String date_column;
|
||||
String sampling_expression;
|
||||
UInt64 index_granularity;
|
||||
/// Merging related params
|
||||
int merging_params_mode;
|
||||
int merge_params_version = REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS;
|
||||
String sign_column;
|
||||
String version_column;
|
||||
String is_deleted_column;
|
||||
String columns_to_sum;
|
||||
String graphite_params_hash;
|
||||
String primary_key;
|
||||
MergeTreeDataFormatVersion data_format_version;
|
||||
String partition_key;
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -28,6 +28,7 @@ NamesAndTypesList StorageSystemMutations::getNamesAndTypes()
|
||||
{ "parts_to_do_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
|
||||
{ "parts_to_do", std::make_shared<DataTypeInt64>() },
|
||||
{ "is_done", std::make_shared<DataTypeUInt8>() },
|
||||
{ "is_killed", std::make_shared<DataTypeUInt8>() },
|
||||
{ "latest_failed_part", std::make_shared<DataTypeString>() },
|
||||
{ "latest_fail_time", std::make_shared<DataTypeDateTime>() },
|
||||
{ "latest_fail_reason", std::make_shared<DataTypeString>() },
|
||||
@ -138,6 +139,7 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, ContextPtr c
|
||||
res_columns[col_num++]->insert(parts_to_do_names);
|
||||
res_columns[col_num++]->insert(parts_to_do_names.size());
|
||||
res_columns[col_num++]->insert(status.is_done);
|
||||
res_columns[col_num++]->insert(status.is_killed);
|
||||
res_columns[col_num++]->insert(status.latest_failed_part);
|
||||
res_columns[col_num++]->insert(UInt64(status.latest_fail_time));
|
||||
res_columns[col_num++]->insert(status.latest_fail_reason);
|
||||
|
@ -285,6 +285,8 @@ StorageSystemPartsBase::StorageSystemPartsBase(const StorageID & table_id_, Name
|
||||
|
||||
auto add_alias = [&](const String & alias_name, const String & column_name)
|
||||
{
|
||||
if (!tmp_columns.has(column_name))
|
||||
return;
|
||||
ColumnDescription column(alias_name, tmp_columns.get(column_name).type);
|
||||
column.default_desc.kind = ColumnDefaultKind::Alias;
|
||||
column.default_desc.expression = std::make_shared<ASTIdentifier>(column_name);
|
||||
|
29
tests/config/config.d/graphite_alternative.xml
Normal file
29
tests/config/config.d/graphite_alternative.xml
Normal file
@ -0,0 +1,29 @@
|
||||
<!-- alternative graphite config, for testing 02910_replicated_merge_parameters_must_consistent -->
|
||||
<clickhouse>
|
||||
<graphite_rollup_alternative>
|
||||
<version_column_name>Version</version_column_name>
|
||||
<pattern>
|
||||
<regexp>sum</regexp>
|
||||
<function>any</function>
|
||||
<retention>
|
||||
<age>0</age>
|
||||
<precision>600</precision>
|
||||
</retention>
|
||||
<retention>
|
||||
<age>17280</age>
|
||||
<precision>6000</precision>
|
||||
</retention>
|
||||
</pattern>
|
||||
<default>
|
||||
<function>any</function>
|
||||
<retention>
|
||||
<age>0</age>
|
||||
<precision>600</precision>
|
||||
</retention>
|
||||
<retention>
|
||||
<age>17280</age>
|
||||
<precision>6000</precision>
|
||||
</retention>
|
||||
</default>
|
||||
</graphite_rollup_alternative>
|
||||
</clickhouse>
|
@ -26,6 +26,7 @@ ln -sf $SRC_PATH/config.d/macros.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
|
||||
|
@ -2,6 +2,7 @@
|
||||
<profiles>
|
||||
<default>
|
||||
<max_query_size from_env="MAX_QUERY_SIZE" />
|
||||
<max_threads replace="1" from_env="MAX_THREADS">1</max_threads>
|
||||
</default>
|
||||
</profiles>
|
||||
<users>
|
||||
|
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
@ -36,9 +37,13 @@ node7 = cluster.add_instance(
|
||||
"configs/000-config_with_env_subst.xml",
|
||||
"configs/010-env_subst_override.xml",
|
||||
],
|
||||
env_variables={"MAX_QUERY_SIZE": "121212"},
|
||||
env_variables={
|
||||
# overridden with 424242
|
||||
"MAX_QUERY_SIZE": "121212",
|
||||
"MAX_THREADS": "2",
|
||||
},
|
||||
instance_env_variables=True,
|
||||
) # overridden with 424242
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@ -91,6 +96,65 @@ def test_config(start_cluster):
|
||||
node7.query("select value from system.settings where name = 'max_query_size'")
|
||||
== "424242\n"
|
||||
)
|
||||
assert (
|
||||
node7.query("select value from system.settings where name = 'max_threads'")
|
||||
== "2\n"
|
||||
)
|
||||
|
||||
|
||||
def test_config_invalid_overrides(start_cluster):
|
||||
node7.replace_config(
|
||||
"/etc/clickhouse-server/users.d/000-config_with_env_subst.xml",
|
||||
"""
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<max_query_size from_env="MAX_QUERY_SIZE" />
|
||||
<max_threads from_env="MAX_THREADS">100</max_threads>
|
||||
</default>
|
||||
</profiles>
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</default>
|
||||
|
||||
<include incl="users_1" />
|
||||
<include incl="users_2" />
|
||||
</users>
|
||||
</clickhouse>
|
||||
""",
|
||||
)
|
||||
with pytest.raises(
|
||||
QueryRuntimeException,
|
||||
match="Failed to preprocess config '/etc/clickhouse-server/users.xml': Exception: Element <max_threads> has value and does not have 'replace' attribute, can't process from_env substitution",
|
||||
):
|
||||
node7.query("SYSTEM RELOAD CONFIG")
|
||||
node7.replace_config(
|
||||
"/etc/clickhouse-server/users.d/000-config_with_env_subst.xml",
|
||||
"""
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<max_query_size from_env="MAX_QUERY_SIZE" />
|
||||
<max_threads replace="1" from_env="MAX_THREADS">1</max_threads>
|
||||
</default>
|
||||
</profiles>
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</default>
|
||||
|
||||
<include incl="users_1" />
|
||||
<include incl="users_2" />
|
||||
</users>
|
||||
</clickhouse>
|
||||
""",
|
||||
)
|
||||
node7.query("SYSTEM RELOAD CONFIG")
|
||||
|
||||
|
||||
def test_include_config(start_cluster):
|
||||
|
@ -0,0 +1,22 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<test_single_shard_multiple_replicas>
|
||||
<shard>
|
||||
<internal_replication>true</internal_replication>
|
||||
<replica>
|
||||
<host>n1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>n2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>n3</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_single_shard_multiple_replicas>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
||||
|
140
tests/integration/test_parallel_replicas_working_set/test.py
Normal file
140
tests/integration/test_parallel_replicas_working_set/test.py
Normal file
@ -0,0 +1,140 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
nodes = [
|
||||
cluster.add_instance(
|
||||
f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
|
||||
)
|
||||
for i in (1, 2, 3)
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def create_tables(cluster, table_name, node_with_covering_part):
|
||||
# create replicated tables
|
||||
for node in nodes:
|
||||
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
|
||||
|
||||
nodes[0].query(
|
||||
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1')
|
||||
ORDER BY (key)"""
|
||||
)
|
||||
nodes[1].query(
|
||||
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2')
|
||||
ORDER BY (key)"""
|
||||
)
|
||||
nodes[2].query(
|
||||
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3')
|
||||
ORDER BY (key)"""
|
||||
)
|
||||
# stop merges to keep original parts
|
||||
# stop fetches to keep only parts created on the nodes
|
||||
for i in (0, 1, 2):
|
||||
if i != node_with_covering_part:
|
||||
nodes[i].query(f"system stop fetches {table_name}")
|
||||
nodes[i].query(f"system stop merges {table_name}")
|
||||
|
||||
# populate data, equal number of rows for each replica
|
||||
nodes[0].query(
|
||||
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10)",
|
||||
)
|
||||
nodes[0].query(
|
||||
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10, 10)"
|
||||
)
|
||||
nodes[1].query(
|
||||
f"INSERT INTO {table_name} SELECT number, number FROM numbers(20, 10)"
|
||||
)
|
||||
nodes[1].query(
|
||||
f"INSERT INTO {table_name} SELECT number, number FROM numbers(30, 10)"
|
||||
)
|
||||
nodes[2].query(
|
||||
f"INSERT INTO {table_name} SELECT number, number FROM numbers(40, 10)"
|
||||
)
|
||||
nodes[2].query(
|
||||
f"INSERT INTO {table_name} SELECT number, number FROM numbers(50, 10)"
|
||||
)
|
||||
nodes[node_with_covering_part].query(f"system sync replica {table_name}")
|
||||
nodes[node_with_covering_part].query(f"optimize table {table_name}")
|
||||
|
||||
# check we have expected set of parts
|
||||
expected_active_parts = ""
|
||||
if node_with_covering_part == 0:
|
||||
expected_active_parts = (
|
||||
"all_0_5_1\nall_2_2_0\nall_3_3_0\nall_4_4_0\nall_5_5_0\n"
|
||||
)
|
||||
|
||||
if node_with_covering_part == 1:
|
||||
expected_active_parts = (
|
||||
"all_0_0_0\nall_0_5_1\nall_1_1_0\nall_4_4_0\nall_5_5_0\n"
|
||||
)
|
||||
|
||||
if node_with_covering_part == 2:
|
||||
expected_active_parts = (
|
||||
"all_0_0_0\nall_0_5_1\nall_1_1_0\nall_2_2_0\nall_3_3_0\n"
|
||||
)
|
||||
|
||||
assert (
|
||||
nodes[0].query(
|
||||
f"select distinct name from clusterAllReplicas({cluster}, system.parts) where table='{table_name}' and active order by name"
|
||||
)
|
||||
== expected_active_parts
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_with_covering_part", [0, 1, 2])
|
||||
def test_covering_part_in_announcement(start_cluster, node_with_covering_part):
|
||||
"""create and populate table in special way (see create_table()),
|
||||
node_with_covering_part contains all parts merged into one,
|
||||
other nodes contain only parts which are result of insert via the node
|
||||
"""
|
||||
|
||||
cluster = "test_single_shard_multiple_replicas"
|
||||
table_name = "test_table"
|
||||
create_tables(cluster, table_name, node_with_covering_part)
|
||||
|
||||
# query result can be one of the following outcomes
|
||||
# (1) query result if parallel replicas working set contains all_0_5_1
|
||||
expected_full_result = "60\t0\t59\t1770\n"
|
||||
expected_results = {expected_full_result}
|
||||
|
||||
# (2) query result if parallel replicas working set DOESN'T contain all_0_5_1
|
||||
if node_with_covering_part == 0:
|
||||
expected_results.add("40\t20\t59\t1580\n")
|
||||
if node_with_covering_part == 1:
|
||||
expected_results.add("40\t0\t59\t1180\n")
|
||||
if node_with_covering_part == 2:
|
||||
expected_results.add("40\t0\t39\t780\n")
|
||||
|
||||
# parallel replicas
|
||||
result = nodes[0].query(
|
||||
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}",
|
||||
settings={
|
||||
"allow_experimental_parallel_reading_from_replicas": 2,
|
||||
"prefer_localhost_replica": 0,
|
||||
"max_parallel_replicas": 3,
|
||||
"use_hedged_requests": 0,
|
||||
"cluster_for_parallel_replicas": cluster,
|
||||
},
|
||||
)
|
||||
assert result in expected_results
|
||||
|
||||
# w/o parallel replicas
|
||||
assert (
|
||||
nodes[node_with_covering_part].query(
|
||||
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}",
|
||||
settings={
|
||||
"allow_experimental_parallel_reading_from_replicas": 0,
|
||||
},
|
||||
)
|
||||
== expected_full_result
|
||||
)
|
@ -133,6 +133,7 @@ SYSTEM SENDS ['SYSTEM STOP SENDS','SYSTEM START SENDS','STOP SENDS','START SENDS
|
||||
SYSTEM REPLICATION QUEUES ['SYSTEM STOP REPLICATION QUEUES','SYSTEM START REPLICATION QUEUES','STOP REPLICATION QUEUES','START REPLICATION QUEUES'] TABLE SYSTEM
|
||||
SYSTEM DROP REPLICA ['DROP REPLICA'] TABLE SYSTEM
|
||||
SYSTEM SYNC REPLICA ['SYNC REPLICA'] TABLE SYSTEM
|
||||
SYSTEM REPLICA READINESS ['SYSTEM REPLICA READY','SYSTEM REPLICA UNREADY'] GLOBAL SYSTEM
|
||||
SYSTEM RESTART REPLICA ['RESTART REPLICA'] TABLE SYSTEM
|
||||
SYSTEM RESTORE REPLICA ['RESTORE REPLICA'] TABLE SYSTEM
|
||||
SYSTEM WAIT LOADING PARTS ['WAIT LOADING PARTS'] TABLE SYSTEM
|
||||
|
@ -1,3 +1,3 @@
|
||||
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 7\nsign column: sign\nprimary key: key1, key2\ndata format version: 1\npartition key: d\ngranularity bytes: 10485760\n
|
||||
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 7\nsign column: sign\nprimary key: key1, key2\ndata format version: 1\npartition key: d\ngranularity bytes: 10485760\nmerge parameters format version: 2\nversion column: version\n
|
||||
1
|
||||
1
|
||||
|
@ -1,2 +1,2 @@
|
||||
CREATE TABLE default.x\n(\n `i` Int32,\n INDEX mm log2(i) TYPE minmax GRANULARITY 1,\n INDEX nn log2(i) TYPE minmax GRANULARITY 1,\n PROJECTION p\n (\n SELECT max(i)\n ),\n PROJECTION p2\n (\n SELECT min(i)\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/default/x\', \'r\')\nORDER BY i\nSETTINGS index_granularity = 8192
|
||||
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm log2(i) TYPE minmax GRANULARITY 1, nn log2(i) TYPE minmax GRANULARITY 1\nprojections: p (SELECT max(i)), p2 (SELECT min(i))\ngranularity bytes: 10485760\n
|
||||
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm log2(i) TYPE minmax GRANULARITY 1, nn log2(i) TYPE minmax GRANULARITY 1\nprojections: p (SELECT max(i)), p2 (SELECT min(i))\ngranularity bytes: 10485760\nmerge parameters format version: 2\n
|
||||
|
@ -406,6 +406,7 @@ CREATE TABLE system.mutations
|
||||
`parts_to_do_names` Array(String),
|
||||
`parts_to_do` Int64,
|
||||
`is_done` UInt8,
|
||||
`is_killed` UInt8,
|
||||
`latest_failed_part` String,
|
||||
`latest_fail_time` DateTime,
|
||||
`latest_fail_reason` String
|
||||
|
@ -0,0 +1,80 @@
|
||||
-- Tags: zookeeper, no-replicated-database
|
||||
CREATE TABLE t
|
||||
(
|
||||
`id` UInt64,
|
||||
`val` String,
|
||||
`legacy_ver` UInt64,
|
||||
)
|
||||
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r1', legacy_ver)
|
||||
ORDER BY id;
|
||||
|
||||
CREATE TABLE t_r
|
||||
(
|
||||
`id` UInt64,
|
||||
`val` String,
|
||||
`legacy_ver` UInt64
|
||||
)
|
||||
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r2')
|
||||
ORDER BY id; -- { serverError METADATA_MISMATCH }
|
||||
|
||||
CREATE TABLE t2
|
||||
(
|
||||
`id` UInt64,
|
||||
`val` String,
|
||||
`legacy_ver` UInt64,
|
||||
`deleted` UInt8
|
||||
)
|
||||
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r1', legacy_ver)
|
||||
ORDER BY id;
|
||||
|
||||
CREATE TABLE t2_r
|
||||
(
|
||||
`id` UInt64,
|
||||
`val` String,
|
||||
`legacy_ver` UInt64,
|
||||
`deleted` UInt8
|
||||
)
|
||||
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r2', legacy_ver, deleted)
|
||||
ORDER BY id; -- { serverError METADATA_MISMATCH }
|
||||
|
||||
CREATE TABLE t3
|
||||
(
|
||||
`key` UInt64,
|
||||
`metrics1` UInt64,
|
||||
`metrics2` UInt64
|
||||
)
|
||||
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r1', metrics1)
|
||||
ORDER BY key;
|
||||
|
||||
CREATE TABLE t3_r
|
||||
(
|
||||
`key` UInt64,
|
||||
`metrics1` UInt64,
|
||||
`metrics2` UInt64
|
||||
)
|
||||
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r2', metrics2)
|
||||
ORDER BY key; -- { serverError METADATA_MISMATCH }
|
||||
|
||||
CREATE TABLE t4
|
||||
(
|
||||
`key` UInt32,
|
||||
`Path` String,
|
||||
`Time` DateTime('UTC'),
|
||||
`Value` Float64,
|
||||
`Version` UInt32,
|
||||
`col` UInt64
|
||||
)
|
||||
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r1', 'graphite_rollup')
|
||||
ORDER BY key;
|
||||
|
||||
CREATE TABLE t4_r
|
||||
(
|
||||
`key` UInt32,
|
||||
`Path` String,
|
||||
`Time` DateTime('UTC'),
|
||||
`Value` Float64,
|
||||
`Version` UInt32,
|
||||
`col` UInt64
|
||||
)
|
||||
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r2', 'graphite_rollup_alternative')
|
||||
ORDER BY key; -- { serverError METADATA_MISMATCH }
|
Loading…
Reference in New Issue
Block a user