codestyle

This commit is contained in:
Nikita Mikhaylov 2020-02-19 18:45:49 +03:00
parent 24cbd0d6d1
commit 819a4ad6da
4 changed files with 55 additions and 55 deletions

View File

@ -14,10 +14,10 @@ class ClusterCopier
{
public:
ClusterCopier(const String &task_path_,
const String &host_id_,
const String &proxy_database_name_,
Context &context_)
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
Context & context_)
:
task_zookeeper_path(task_path_),
host_id(host_id_),
@ -28,14 +28,14 @@ public:
void init();
template<typename T>
decltype(auto) retry(T &&func, UInt64 max_tries = 100);
decltype(auto) retry(T && func, UInt64 max_tries = 100);
void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard) ;
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void discoverTablePartitions(const ConnectionTimeouts &timeouts, TaskTable &task_table, UInt64 num_threads = 0);
void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0);
void uploadTaskDescription(const std::string &task_path, const std::string &task_file, const bool force);
void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force);
void reloadTaskDescription();
@ -82,26 +82,26 @@ protected:
* We have to ensure that all shards have the finished state and there is no dirty flag.
* Moreover, we have to check status twice and check zxid, because state can change during the checking.
*/
bool checkPartitionIsDone(const TaskTable &task_table, const String &partition_name,
const TasksShard &shards_with_partition);
bool checkPartitionIsDone(const TaskTable & task_table, const String & partition_name,
const TasksShard & shards_with_partition);
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr &query_ast);
/// Replaces ENGINE and table name in a create query
std::shared_ptr<ASTCreateQuery>
rewriteCreateQueryStorage(const ASTPtr &create_query_ast, const DatabaseAndTableName &new_table,
const ASTPtr &new_storage_ast);
rewriteCreateQueryStorage(const ASTPtr & create_query_ast, const DatabaseAndTableName & new_table,
const ASTPtr & new_storage_ast);
bool tryDropPartition(ShardPartition &task_partition,
const zkutil::ZooKeeperPtr &zookeeper,
const CleanStateClock &clean_state_clock);
bool tryDropPartition(ShardPartition & task_partition,
const zkutil::ZooKeeperPtr & zookeeper,
const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
bool tryProcessTable(const ConnectionTimeouts &timeouts, TaskTable &task_table);
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
@ -137,10 +137,10 @@ protected:
* Returns number of shards for which at least one replica executed query successfully
*/
UInt64 executeQueryOnCluster(
const ClusterPtr &cluster,
const String &query,
const ASTPtr &query_ast_ = nullptr,
const Settings *settings = nullptr,
const ClusterPtr & cluster,
const String & query,
const ASTPtr & query_ast_ = nullptr,
const Settings * settings = nullptr,
PoolMode pool_mode = PoolMode::GET_ALL,
UInt64 max_successful_executions_per_shard = 0) const;
@ -166,8 +166,8 @@ private:
bool is_safe_mode = false;
double copy_fault_probability = 0.0;
Context &context;
Poco::Logger *log;
Context & context;
Poco::Logger * log;
std::chrono::milliseconds default_sleep_time{1000};
};

View File

@ -3,14 +3,14 @@
namespace DB {
[[maybe_unused]] ConfigurationPtr getConfigurationFromXMLString(const std::string &xml_data) {
ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_data) {
std::stringstream ss(xml_data);
Poco::XML::InputSource input_source{ss};
return {new Poco::Util::XMLConfiguration{&input_source}};
}
String getQuotedTable(const String &database, const String &table) {
String getQuotedTable(const String & database, const String & table) {
if (database.empty()) {
return backQuoteIfNeed(table);
}
@ -18,15 +18,15 @@ String getQuotedTable(const String &database, const String &table) {
return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
}
String getQuotedTable(const DatabaseAndTableName &db_and_table) {
String getQuotedTable(const DatabaseAndTableName & db_and_table) {
return getQuotedTable(db_and_table.first, db_and_table.second);
}
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String &cluster_name, const String &database, const String &table,
const ASTPtr &sharding_key_ast) {
const String & cluster_name, const String & database, const String & table,
const ASTPtr & sharding_key_ast) {
auto args = std::make_shared<ASTExpressionList>();
args->children.emplace_back(std::make_shared<ASTLiteral>(cluster_name));
args->children.emplace_back(std::make_shared<ASTIdentifier>(database));
@ -45,28 +45,28 @@ std::shared_ptr<ASTStorage> createASTStorageDistributed(
}
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr &stream) {
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream) {
return std::make_shared<SquashingBlockInputStream>(
stream,
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
}
Block getBlockWithAllStreamData(const BlockInputStreamPtr &stream) {
Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream) {
return squashStreamIntoOneBlock(stream)->read();
}
bool isExtendedDefinitionStorage(const ASTPtr &storage_ast) {
const auto &storage = storage_ast->as<ASTStorage &>();
bool isExtendedDefinitionStorage(const ASTPtr & storage_ast) {
const auto & storage = storage_ast->as<ASTStorage &>();
return storage.partition_by || storage.order_by || storage.sample_by;
}
ASTPtr extractPartitionKey(const ASTPtr &storage_ast) {
ASTPtr extractPartitionKey(const ASTPtr & storage_ast) {
String storage_str = queryToString(storage_ast);
const auto &storage = storage_ast->as<ASTStorage &>();
const auto &engine = storage.engine->as<ASTFunction &>();
const auto & storage = storage_ast->as<ASTStorage &>();
const auto & engine = storage.engine->as<ASTFunction &>();
if (!endsWith(engine.name, "MergeTree")) {
throw Exception(
@ -78,7 +78,7 @@ ASTPtr extractPartitionKey(const ASTPtr &storage_ast) {
if (storage.partition_by)
return storage.partition_by->clone();
static const char *all = "all";
static const char * all = "all";
return std::make_shared<ASTLiteral>(Field(all, strlen(all)));
} else {
bool is_replicated = startsWith(engine.name, "Replicated");
@ -88,18 +88,18 @@ ASTPtr extractPartitionKey(const ASTPtr &storage_ast) {
throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
ASTPtr arguments_ast = engine.arguments->clone();
ASTs &arguments = arguments_ast->children;
ASTs & arguments = arguments_ast->children;
if (arguments.size() < min_args)
throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str,
ErrorCodes::BAD_ARGUMENTS);
ASTPtr &month_arg = is_replicated ? arguments[2] : arguments[1];
ASTPtr & month_arg = is_replicated ? arguments[2] : arguments[1];
return makeASTFunction("toYYYYMM", month_arg->clone());
}
}
[[maybe_unused]] ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random) {
ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random) {
ShardPriority res;
if (replicas.empty())

View File

@ -81,11 +81,11 @@ namespace ErrorCodes
}
[[maybe_unused]] ConfigurationPtr getConfigurationFromXMLString(const std::string &xml_data);
ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_data);
[[maybe_unused]] String getQuotedTable(const String &database, const String &table);
String getQuotedTable(const String & database, const String & table);
[[maybe_unused]] String getQuotedTable(const DatabaseAndTableName &db_and_table);
String getQuotedTable(const DatabaseAndTableName & db_and_table);
enum class TaskState {
@ -98,7 +98,7 @@ enum class TaskState {
struct TaskStateWithOwner {
TaskStateWithOwner() = default;
TaskStateWithOwner(TaskState state_, const String &owner_) : state(state_), owner(owner_) {}
TaskStateWithOwner(TaskState state_, const String & owner_) : state(state_), owner(owner_) {}
TaskState state{TaskState::Unknown};
String owner;
@ -136,7 +136,7 @@ struct ShardPriority
size_t hostname_difference = 0;
UInt8 random = 0;
static bool greaterPriority(const ShardPriority &current, const ShardPriority &other) {
static bool greaterPriority(const ShardPriority & current, const ShardPriority & other) {
return std::forward_as_tuple(current.is_remote, current.hostname_difference, current.random)
< std::forward_as_tuple(other.is_remote, other.hostname_difference, other.random);
}
@ -158,19 +158,19 @@ struct MultiTransactionInfo {
};
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
[[maybe_unused]] std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String &cluster_name, const String &database, const String &table,
const ASTPtr &sharding_key_ast = nullptr);
std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String & cluster_name, const String & database, const String & table,
const ASTPtr & sharding_key_ast = nullptr);
[[maybe_unused]] BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr &stream);
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream);
[[maybe_unused]] Block getBlockWithAllStreamData(const BlockInputStreamPtr &stream);
Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream);
[[maybe_unused]] bool isExtendedDefinitionStorage(const ASTPtr &storage_ast);
bool isExtendedDefinitionStorage(const ASTPtr & storage_ast);
[[maybe_unused]] ASTPtr extractPartitionKey(const ASTPtr &storage_ast);
ASTPtr extractPartitionKey(const ASTPtr & storage_ast);
[[maybe_unused]] ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random);
ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random);
}

View File

@ -221,14 +221,14 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
}
template<typename RandomEngine>
inline void TaskTable::initShards(RandomEngine &&random_engine) {
inline void TaskTable::initShards(RandomEngine && random_engine) {
const String & fqdn_name = getFQDNOrHostName();
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (auto &shard_info : cluster_pull->getShardsInfo()) {
for (auto & shard_info : cluster_pull->getShardsInfo()) {
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto &replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
all_shards.emplace_back(task_shard);
@ -236,13 +236,13 @@ inline void TaskTable::initShards(RandomEngine &&random_engine) {
// Sort by priority
std::sort(all_shards.begin(), all_shards.end(),
[](const TaskShardPtr &lhs, const TaskShardPtr &rhs) {
[](const TaskShardPtr & lhs, const TaskShardPtr & rhs) {
return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
});
// Cut local shards
auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
[](const TaskShardPtr &lhs, UInt8 is_remote) {
[](const TaskShardPtr & lhs, UInt8 is_remote) {
return lhs->priority.is_remote < is_remote;
});