dbms: system.replicas: improvement [#METR-12726].

This commit is contained in:
Alexey Milovidov 2014-10-08 00:04:40 +04:00
parent 48ec87bc40
commit 49df24d891

View File

@ -4,6 +4,7 @@
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Storages/StorageSystemReplicas.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/VirtualColumnUtils.h>
namespace DB
@ -14,7 +15,7 @@ StorageSystemReplicas::StorageSystemReplicas(const std::string & name_, const Co
: name(name_), context(context_)
, columns{
{ "database", new DataTypeString },
{ "name", new DataTypeString },
{ "table", new DataTypeString },
{ "engine", new DataTypeString },
{ "is_leader", new DataTypeUInt8 },
{ "is_readonly", new DataTypeUInt8 },
@ -75,8 +76,33 @@ BlockInputStreams StorageSystemReplicas::read(
}
ColumnWithNameAndType col_database { new ColumnString, new DataTypeString, "database"};
ColumnWithNameAndType col_name { new ColumnString, new DataTypeString, "name"};
ColumnWithNameAndType col_table { new ColumnString, new DataTypeString, "table"};
ColumnWithNameAndType col_engine { new ColumnString, new DataTypeString, "engine"};
for (auto & db : replicated_tables)
{
for (auto & table : db.second)
{
col_database.column->insert(db.first);
col_table.column->insert(table.first);
col_engine.column->insert(table.second->getName());
}
}
/// Определяем, какие нужны таблицы, по условиям в запросе.
{
Block filtered_block { col_database, col_table, col_engine };
VirtualColumnUtils::filterBlockWithQuery(query, filtered_block, context);
if (!filtered_block.rows())
return BlockInputStreams();
col_database = filtered_block.getByName("database");
col_table = filtered_block.getByName("table");
col_engine = filtered_block.getByName("engine");
}
ColumnWithNameAndType col_is_leader { new ColumnUInt8, new DataTypeUInt8, "is_leader"};
ColumnWithNameAndType col_is_readonly { new ColumnUInt8, new DataTypeUInt8, "is_readonly"};
ColumnWithNameAndType col_is_session_expired{ new ColumnUInt8, new DataTypeUInt8, "is_session_expired"};
@ -94,39 +120,35 @@ BlockInputStreams StorageSystemReplicas::read(
ColumnWithNameAndType col_total_replicas { new ColumnUInt8, new DataTypeUInt8, "total_replicas"};
ColumnWithNameAndType col_active_replicas { new ColumnUInt8, new DataTypeUInt8, "active_replicas"};
for (auto & db : replicated_tables)
for (size_t i = 0, size = col_database.column->size(); i < size; ++i)
{
for (auto & table : db.second)
{
col_database.column->insert(db.first);
col_name.column->insert(table.first);
col_engine.column->insert(table.second->getName());
StorageReplicatedMergeTree::Status status;
typeid_cast<StorageReplicatedMergeTree &>(
*replicated_tables
[(*col_database.column)[i].safeGet<const String &>()]
[(*col_table.column)[i].safeGet<const String &>()]).getStatus(status, with_zk_fields);
StorageReplicatedMergeTree::Status status;
typeid_cast<StorageReplicatedMergeTree &>(*table.second).getStatus(status, with_zk_fields);
col_is_leader .column->insert(UInt64(status.is_leader));
col_is_readonly .column->insert(UInt64(status.is_readonly));
col_is_session_expired .column->insert(UInt64(status.is_session_expired));
col_future_parts .column->insert(UInt64(status.future_parts));
col_parts_to_check .column->insert(UInt64(status.parts_to_check));
col_zookeeper_path .column->insert(status.zookeeper_path);
col_replica_name .column->insert(status.replica_name);
col_replica_path .column->insert(status.replica_path);
col_columns_version .column->insert(Int64(status.columns_version));
col_queue_size .column->insert(UInt64(status.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.merges_in_queue));
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
col_total_replicas .column->insert(UInt64(status.total_replicas));
col_active_replicas .column->insert(UInt64(status.active_replicas));
}
col_is_leader .column->insert(UInt64(status.is_leader));
col_is_readonly .column->insert(UInt64(status.is_readonly));
col_is_session_expired .column->insert(UInt64(status.is_session_expired));
col_future_parts .column->insert(UInt64(status.future_parts));
col_parts_to_check .column->insert(UInt64(status.parts_to_check));
col_zookeeper_path .column->insert(status.zookeeper_path);
col_replica_name .column->insert(status.replica_name);
col_replica_path .column->insert(status.replica_path);
col_columns_version .column->insert(Int64(status.columns_version));
col_queue_size .column->insert(UInt64(status.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.merges_in_queue));
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
col_total_replicas .column->insert(UInt64(status.total_replicas));
col_active_replicas .column->insert(UInt64(status.active_replicas));
}
Block block{
col_database,
col_name,
col_table,
col_engine,
col_is_leader,
col_is_readonly,