ClickHouse/dbms/src/Storages/System/StorageSystemReplicas.cpp

203 lines
9.6 KiB
C++
Raw Normal View History

#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
2014-11-08 23:55:24 +00:00
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataStreams/OneBlockInputStream.h>
2015-09-24 03:50:09 +00:00
#include <DB/Storages/System/StorageSystemReplicas.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/VirtualColumnUtils.h>
Squashed commit of the following: commit f9b478181cd49224154cc350fb57df7121842f1c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 04:06:36 2016 +0300 Database engines: development [#METR-19997]. commit f7a10a67761ccfd05f3dac32d6444920cd8d4d60 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 03:44:37 2016 +0300 Database engines: development [#METR-19997]. commit bd98a8558e98bad2bed278e5762c4e0fc66e6f38 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 00:33:59 2016 +0300 Database engines: development [#METR-19997]. commit 19712fd884c22a4e2c2b67474086dea8f44e7c7b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 00:03:11 2016 +0300 Database engines: development [#METR-19997]. commit 50274d6df7e91fcc34aab8a8c72347daa2c6512f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 23:24:57 2016 +0300 Database engines: development [#METR-19997]. commit 4a0b99b19b34e90ef8b7be2d199f6232e36ef3f7 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 22:50:36 2016 +0300 Database engines: development [#METR-19997]. commit 44ff3ebba7a3e460a27a89f31ddf199dbea1d182 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 15:09:17 2016 +0300 Database engines: development [#METR-19997]. commit 137c31f3004cfd282473b6acb01cbe1b4ca2aadd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 03:26:34 2016 +0300 Database engines: development [#METR-19997]. commit aa4c0496d4afe4a691164254be2bd5600542b38a Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 03:22:59 2016 +0300 Database engines: development [#METR-19997]. commit 5a94d1f0607450a2dac28a4d7df8b1393a864c23 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 01:02:40 2016 +0300 Database engines: development [#METR-19997]. commit 50fd5b52ea1141955a5dfba0dcb191f3289ac25b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Thu Mar 17 23:23:40 2016 +0300 Database engines: development [#METR-19997]. commit a333d91b058e4f56dd83a6d2878c3c2bd8efc002 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Thu Mar 17 20:29:07 2016 +0300 Database engines: development [#METR-19997]. commit f81d366e7ac8348436f2698d040f8e341743a024 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Thu Mar 17 01:30:23 2016 +0300 Database engines: development [#METR-19997]. commit d0696860c9060827896214c08d147c759ea79376 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 21:55:31 2016 +0300 Database engines: development [#METR-19997]. commit 46a168c2ada140a0e95cd8d4b9d8ba9bac855d11 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 08:00:58 2016 +0300 Database engines: development [#METR-19997]. commit 20a2bad161454225fc1b5f9b919b842fbebc3231 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 06:51:10 2016 +0300 Database engines: development [#METR-19997]. commit ca0a77fcc2a8d0b276eb3743c53551ad3fe16314 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 06:02:20 2016 +0300 Reverted erroneous modification [#METR-19997]. commit 1370bdcc4594182f6ef2b146f9afabfe1c295080 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 00:41:34 2016 +0300 Database engines: development [#METR-19997]. commit 16e72c67041cae6471509d3f0f3d4a9aa7b7dc0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Tue Mar 15 00:41:48 2016 +0300 Database engines: development [#METR-19997].
2016-03-19 01:18:49 +00:00
#include <DB/Databases/IDatabase.h>
namespace DB
{
StorageSystemReplicas::StorageSystemReplicas(const std::string & name_)
: name(name_)
, columns{
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
{ "engine", std::make_shared<DataTypeString>() },
{ "is_leader", std::make_shared<DataTypeUInt8>() },
{ "is_readonly", std::make_shared<DataTypeUInt8>() },
{ "is_session_expired", std::make_shared<DataTypeUInt8>() },
{ "future_parts", std::make_shared<DataTypeUInt32>() },
{ "parts_to_check", std::make_shared<DataTypeUInt32>() },
{ "zookeeper_path", std::make_shared<DataTypeString>() },
{ "replica_name", std::make_shared<DataTypeString>() },
{ "replica_path", std::make_shared<DataTypeString>() },
{ "columns_version", std::make_shared<DataTypeInt32>() },
{ "queue_size", std::make_shared<DataTypeUInt32>() },
{ "inserts_in_queue", std::make_shared<DataTypeUInt32>() },
{ "merges_in_queue", std::make_shared<DataTypeUInt32>() },
{ "queue_oldest_time", std::make_shared<DataTypeDateTime>()},
{ "inserts_oldest_time", std::make_shared<DataTypeDateTime>()},
{ "merges_oldest_time", std::make_shared<DataTypeDateTime>()},
{ "oldest_part_to_get", std::make_shared<DataTypeString>() },
{ "oldest_part_to_merge_to",std::make_shared<DataTypeString>() },
{ "log_max_index", std::make_shared<DataTypeUInt64>() },
{ "log_pointer", std::make_shared<DataTypeUInt64>() },
{ "last_queue_update", std::make_shared<DataTypeDateTime>()},
{ "total_replicas", std::make_shared<DataTypeUInt8>() },
{ "active_replicas", std::make_shared<DataTypeUInt8>() },
}
{
}
StoragePtr StorageSystemReplicas::create(const std::string & name_)
{
return make_shared(name_);
}
BlockInputStreams StorageSystemReplicas::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
/// Собираем набор реплицируемых таблиц.
Squashed commit of the following: commit f9b478181cd49224154cc350fb57df7121842f1c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 04:06:36 2016 +0300 Database engines: development [#METR-19997]. commit f7a10a67761ccfd05f3dac32d6444920cd8d4d60 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 03:44:37 2016 +0300 Database engines: development [#METR-19997]. commit bd98a8558e98bad2bed278e5762c4e0fc66e6f38 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 00:33:59 2016 +0300 Database engines: development [#METR-19997]. commit 19712fd884c22a4e2c2b67474086dea8f44e7c7b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Mar 19 00:03:11 2016 +0300 Database engines: development [#METR-19997]. commit 50274d6df7e91fcc34aab8a8c72347daa2c6512f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 23:24:57 2016 +0300 Database engines: development [#METR-19997]. commit 4a0b99b19b34e90ef8b7be2d199f6232e36ef3f7 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 22:50:36 2016 +0300 Database engines: development [#METR-19997]. commit 44ff3ebba7a3e460a27a89f31ddf199dbea1d182 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 15:09:17 2016 +0300 Database engines: development [#METR-19997]. commit 137c31f3004cfd282473b6acb01cbe1b4ca2aadd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 03:26:34 2016 +0300 Database engines: development [#METR-19997]. commit aa4c0496d4afe4a691164254be2bd5600542b38a Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 03:22:59 2016 +0300 Database engines: development [#METR-19997]. commit 5a94d1f0607450a2dac28a4d7df8b1393a864c23 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Mar 18 01:02:40 2016 +0300 Database engines: development [#METR-19997]. commit 50fd5b52ea1141955a5dfba0dcb191f3289ac25b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Thu Mar 17 23:23:40 2016 +0300 Database engines: development [#METR-19997]. commit a333d91b058e4f56dd83a6d2878c3c2bd8efc002 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Thu Mar 17 20:29:07 2016 +0300 Database engines: development [#METR-19997]. commit f81d366e7ac8348436f2698d040f8e341743a024 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Thu Mar 17 01:30:23 2016 +0300 Database engines: development [#METR-19997]. commit d0696860c9060827896214c08d147c759ea79376 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 21:55:31 2016 +0300 Database engines: development [#METR-19997]. commit 46a168c2ada140a0e95cd8d4b9d8ba9bac855d11 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 08:00:58 2016 +0300 Database engines: development [#METR-19997]. commit 20a2bad161454225fc1b5f9b919b842fbebc3231 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 06:51:10 2016 +0300 Database engines: development [#METR-19997]. commit ca0a77fcc2a8d0b276eb3743c53551ad3fe16314 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 06:02:20 2016 +0300 Reverted erroneous modification [#METR-19997]. commit 1370bdcc4594182f6ef2b146f9afabfe1c295080 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Wed Mar 16 00:41:34 2016 +0300 Database engines: development [#METR-19997]. commit 16e72c67041cae6471509d3f0f3d4a9aa7b7dc0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Tue Mar 15 00:41:48 2016 +0300 Database engines: development [#METR-19997].
2016-03-19 01:18:49 +00:00
std::map<String, std::map<String, StoragePtr>> replicated_tables;
for (const auto & db : context.getDatabases())
for (auto iterator = db.second->getIterator(); iterator->isValid(); iterator->next())
if (typeid_cast<const StorageReplicatedMergeTree *>(iterator->table().get()))
replicated_tables[db.first][iterator->name()] = iterator->table();
/// Нужны ли столбцы, требующие для вычисления хождение в ZooKeeper.
bool with_zk_fields = false;
for (const auto & name : column_names)
{
if ( name == "log_max_index"
|| name == "log_pointer"
|| name == "total_replicas"
|| name == "active_replicas")
{
with_zk_fields = true;
break;
}
}
ColumnWithTypeAndName col_database { std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "database"};
ColumnWithTypeAndName col_table { std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "table"};
ColumnWithTypeAndName col_engine { std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "engine"};
for (auto & db : replicated_tables)
{
for (auto & table : db.second)
{
col_database.column->insert(db.first);
col_table.column->insert(table.first);
col_engine.column->insert(table.second->getName());
}
}
/// Определяем, какие нужны таблицы, по условиям в запросе.
{
Block filtered_block { col_database, col_table, col_engine };
VirtualColumnUtils::filterBlockWithQuery(query, filtered_block, context);
if (!filtered_block.rows())
return BlockInputStreams();
col_database = filtered_block.getByName("database");
col_table = filtered_block.getByName("table");
col_engine = filtered_block.getByName("engine");
}
ColumnWithTypeAndName col_is_leader { std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "is_leader"};
ColumnWithTypeAndName col_is_readonly { std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "is_readonly"};
ColumnWithTypeAndName col_is_session_expired{ std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "is_session_expired"};
ColumnWithTypeAndName col_future_parts { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeUInt32>(), "future_parts"};
ColumnWithTypeAndName col_parts_to_check { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeUInt32>(), "parts_to_check"};
ColumnWithTypeAndName col_zookeeper_path { std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "zookeeper_path"};
ColumnWithTypeAndName col_replica_name { std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "replica_name"};
ColumnWithTypeAndName col_replica_path { std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "replica_path"};
ColumnWithTypeAndName col_columns_version { std::make_shared<ColumnInt32>(), std::make_shared<DataTypeInt32>(), "columns_version"};
ColumnWithTypeAndName col_queue_size { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeUInt32>(), "queue_size"};
ColumnWithTypeAndName col_inserts_in_queue { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeUInt32>(), "inserts_in_queue"};
ColumnWithTypeAndName col_merges_in_queue { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeUInt32>(), "merges_in_queue"};
ColumnWithTypeAndName col_queue_oldest_time { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeDateTime>(), "queue_oldest_time"};
ColumnWithTypeAndName col_inserts_oldest_time{ std::make_shared<ColumnUInt32>(),std::make_shared<DataTypeDateTime>(), "inserts_oldest_time"};
ColumnWithTypeAndName col_merges_oldest_time{ std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeDateTime>(), "merges_oldest_time"};
ColumnWithTypeAndName col_oldest_part_to_get{ std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "oldest_part_to_get"};
ColumnWithTypeAndName col_oldest_part_to_merge_to{ std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "oldest_part_to_merge_to"};
ColumnWithTypeAndName col_log_max_index { std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "log_max_index"};
ColumnWithTypeAndName col_log_pointer { std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "log_pointer"};
ColumnWithTypeAndName col_last_queue_update { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeDateTime>(), "last_queue_update"};
ColumnWithTypeAndName col_total_replicas { std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "total_replicas"};
ColumnWithTypeAndName col_active_replicas { std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "active_replicas"};
for (size_t i = 0, size = col_database.column->size(); i < size; ++i)
{
StorageReplicatedMergeTree::Status status;
typeid_cast<StorageReplicatedMergeTree &>(
*replicated_tables
[(*col_database.column)[i].safeGet<const String &>()]
[(*col_table.column)[i].safeGet<const String &>()]).getStatus(status, with_zk_fields);
col_is_leader .column->insert(UInt64(status.is_leader));
col_is_readonly .column->insert(UInt64(status.is_readonly));
col_is_session_expired .column->insert(UInt64(status.is_session_expired));
2016-01-10 04:43:30 +00:00
col_future_parts .column->insert(UInt64(status.queue.future_parts));
col_parts_to_check .column->insert(UInt64(status.parts_to_check));
col_zookeeper_path .column->insert(status.zookeeper_path);
col_replica_name .column->insert(status.replica_name);
col_replica_path .column->insert(status.replica_path);
col_columns_version .column->insert(Int64(status.columns_version));
2016-01-10 04:43:30 +00:00
col_queue_size .column->insert(UInt64(status.queue.queue_size));
col_inserts_in_queue .column->insert(UInt64(status.queue.inserts_in_queue));
col_merges_in_queue .column->insert(UInt64(status.queue.merges_in_queue));
col_queue_oldest_time .column->insert(UInt64(status.queue.queue_oldest_time));
col_inserts_oldest_time .column->insert(UInt64(status.queue.inserts_oldest_time));
col_merges_oldest_time .column->insert(UInt64(status.queue.merges_oldest_time));
col_oldest_part_to_get .column->insert(status.queue.oldest_part_to_get);
col_oldest_part_to_merge_to.column->insert(status.queue.oldest_part_to_merge_to);
col_log_max_index .column->insert(status.log_max_index);
col_log_pointer .column->insert(status.log_pointer);
2016-01-10 04:43:30 +00:00
col_last_queue_update .column->insert(UInt64(status.queue.last_queue_update));
col_total_replicas .column->insert(UInt64(status.total_replicas));
col_active_replicas .column->insert(UInt64(status.active_replicas));
}
Block block{
col_database,
col_table,
col_engine,
col_is_leader,
col_is_readonly,
col_is_session_expired,
col_future_parts,
col_parts_to_check,
col_zookeeper_path,
col_replica_name,
col_replica_path,
col_columns_version,
col_queue_size,
col_inserts_in_queue,
col_merges_in_queue,
2014-11-08 23:55:24 +00:00
col_queue_oldest_time,
col_inserts_oldest_time,
col_merges_oldest_time,
col_oldest_part_to_get,
col_oldest_part_to_merge_to,
col_log_max_index,
col_log_pointer,
col_last_queue_update,
col_total_replicas,
col_active_replicas,
};
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block));
}
}