mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Fixes to code, build and style checks
This commit is contained in:
parent
8dacfde3d3
commit
1e579ac375
@ -14,7 +14,7 @@ Columns:
|
||||
- `initiator` ([String](../../sql-reference/data-types/string.md)) - Nod that executed the query.
|
||||
- `query_start_time` ([Date](../../sql-reference/data-types/date.md)) — Query start time.
|
||||
- `query_finish_time` ([Date](../../sql-reference/data-types/date.md)) — Query finish time.
|
||||
- `query_duration_ms` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Duration of query execution in milliseconds.
|
||||
- `query_duration_ms` ([UInt64](../../sql-reference/data-types/datetime64.md)) — Duration of query execution in milliseconds.
|
||||
- `exception_code` ([Enum](../../sql-reference/data-types/enum.md)) - Exception code from ZooKeeper.
|
||||
|
||||
|
||||
@ -35,7 +35,7 @@ entry: query-0000000000
|
||||
host_name: clickhouse01
|
||||
host_address: 172.23.0.11
|
||||
port: 9000
|
||||
status: finished
|
||||
status: Finished
|
||||
cluster: test_cluster
|
||||
query: CREATE DATABASE test_db UUID '4a82697e-c85e-4e5b-a01e-a36f2a758456' ON CLUSTER test_cluster
|
||||
initiator: clickhouse01:9000
|
||||
@ -50,7 +50,7 @@ entry: query-0000000000
|
||||
host_name: clickhouse02
|
||||
host_address: 172.23.0.12
|
||||
port: 9000
|
||||
status: finished
|
||||
status: Finished
|
||||
cluster: test_cluster
|
||||
query: CREATE DATABASE test_db UUID '4a82697e-c85e-4e5b-a01e-a36f2a758456' ON CLUSTER test_cluster
|
||||
initiator: clickhouse01:9000
|
||||
|
@ -13,7 +13,6 @@
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Storages/StorageDistributed.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <algorithm>
|
||||
#include <filesystem>
|
||||
#include <string>
|
||||
|
||||
#include "StorageSystemDDLWorkerQueue.h"
|
||||
|
||||
@ -28,27 +27,21 @@ namespace fs = std::filesystem;
|
||||
|
||||
enum Status
|
||||
{
|
||||
active,
|
||||
finished,
|
||||
unknown,
|
||||
errored
|
||||
ACTIVE,
|
||||
FINISHED,
|
||||
UNKNOWN,
|
||||
ERRORED
|
||||
};
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
std::vector<std::pair<String, Int8>> getStatusEnumsAndValues()
|
||||
{
|
||||
return std::vector<std::pair<String, Int8>>{
|
||||
{"active", static_cast<Int8>(Status::active)},
|
||||
{"finished", static_cast<Int8>(Status::finished)},
|
||||
{"unknown", static_cast<Int8>(Status::unknown)},
|
||||
{"errored", static_cast<Int8>(Status::errored)},
|
||||
{"Active", static_cast<Int8>(Status::ACTIVE)},
|
||||
{"Finished", static_cast<Int8>(Status::FINISHED)},
|
||||
{"Unknown", static_cast<Int8>(Status::UNKNOWN)},
|
||||
{"Errored", static_cast<Int8>(Status::ERRORED)},
|
||||
};
|
||||
}
|
||||
|
||||
@ -102,6 +95,21 @@ NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
|
||||
};
|
||||
}
|
||||
|
||||
static String clusterNameFromDDLQuery(const Context & context, const DDLLogEntry & entry)
|
||||
{
|
||||
const char * begin = entry.query.data();
|
||||
const char * end = begin + entry.query.size();
|
||||
ASTPtr query;
|
||||
ASTQueryWithOnCluster * query_on_cluster;
|
||||
String cluster_name;
|
||||
ParserQuery parser_query(end);
|
||||
String description;
|
||||
query = parseQuery(parser_query, begin, end, description, 0, context.getSettingsRef().max_parser_depth);
|
||||
if (query && (query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get())))
|
||||
cluster_name = query_on_cluster->cluster;
|
||||
return cluster_name;
|
||||
}
|
||||
|
||||
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
|
||||
{
|
||||
zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper();
|
||||
@ -160,8 +168,6 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
|
||||
zkutil::Strings active_nodes;
|
||||
zkutil::Strings finished_nodes;
|
||||
|
||||
// on error just skip and continue.
|
||||
|
||||
code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "active", active_nodes);
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
||||
zk_exception_code = code;
|
||||
@ -178,17 +184,17 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
|
||||
*/
|
||||
if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end())
|
||||
{
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::active));
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::ACTIVE));
|
||||
}
|
||||
else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end())
|
||||
{
|
||||
if (pool_status[replica_index].error_count != 0)
|
||||
{
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::errored));
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::ERRORED));
|
||||
}
|
||||
else
|
||||
{
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::finished));
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::FINISHED));
|
||||
}
|
||||
// regardless of the status finished or errored, the node host_name:port entry was found under the /finished path
|
||||
// & should be able to get the contents of the znode at /finished path.
|
||||
@ -198,12 +204,11 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
|
||||
}
|
||||
else
|
||||
{
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::unknown));
|
||||
res_columns[i++]->insert(static_cast<Int8>(Status::UNKNOWN));
|
||||
}
|
||||
|
||||
Coordination::GetResponse res;
|
||||
if (!futures.empty())
|
||||
res = futures[query_id].get();
|
||||
res = futures[query_id].get();
|
||||
|
||||
auto query_start_time = res.stat.mtime;
|
||||
|
||||
@ -223,19 +228,4 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
|
||||
}
|
||||
}
|
||||
}
|
||||
String StorageSystemDDLWorkerQueue::clusterNameFromDDLQuery(const Context & context, const DDLLogEntry & entry) const
|
||||
{
|
||||
const char * begin = entry.query.data();
|
||||
const char * end = begin + entry.query.size();
|
||||
ASTPtr query;
|
||||
ASTQueryWithOnCluster * query_on_cluster;
|
||||
String cluster_name = "";
|
||||
ParserQuery parser_query(end);
|
||||
String description;
|
||||
query = parseQuery(parser_query, begin, end, description, 0, context.getSettingsRef().max_parser_depth);
|
||||
if (query && (query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get())))
|
||||
cluster_name = query_on_cluster->cluster;
|
||||
return cluster_name;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,6 +30,5 @@ public:
|
||||
std::string getName() const override { return "SystemDDLWorkerQueue"; }
|
||||
|
||||
static NamesAndTypesList getNamesAndTypes();
|
||||
String clusterNameFromDDLQuery(const Context & context, const DB::DDLLogEntry & entry) const;
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user