mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge
This commit is contained in:
parent
2809291432
commit
7084f03ab9
@ -229,6 +229,8 @@ namespace ErrorCodes
|
||||
DATABASE_NOT_EMPTY,
|
||||
DUPLICATE_INTERSERVER_IO_ENDPOINT,
|
||||
NO_SUCH_INTERSERVER_IO_ENDPOINT,
|
||||
ADDING_REPLICA_TO_NON_EMPTY_TABLE,
|
||||
UNEXPECTED_AST_STRUCTURE,
|
||||
|
||||
POCO_EXCEPTION = 1000,
|
||||
STD_EXCEPTION,
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <DB/Interpreters/InterserverIOHandler.h>
|
||||
#include <DB/Client/ConnectionPool.h>
|
||||
#include <statdaemons/ConfigProcessor.h>
|
||||
#include <zkutil/ZooKeeper.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -72,6 +73,8 @@ struct ContextShared
|
||||
|
||||
mutable Poco::Mutex mutex; /// Для доступа и модификации разделяемых объектов.
|
||||
|
||||
mutable SharedPtr<zkutil::ZooKeeper> zookeeper; /// Клиент для ZooKeeper.
|
||||
|
||||
String path; /// Путь к директории с данными, со слешем на конце.
|
||||
Databases databases; /// Список БД и таблиц в них.
|
||||
TableFunctionFactory table_function_factory; /// Табличные функции.
|
||||
@ -293,6 +296,9 @@ public:
|
||||
void setUncompressedCache(size_t cache_size_in_cells);
|
||||
UncompressedCachePtr getUncompressedCache() const;
|
||||
|
||||
void setZooKeeper(SharedPtr<zkutil::ZooKeeper> zookeeper);
|
||||
zkutil::ZooKeeper * getZooKeeper() const;
|
||||
|
||||
/// Создать кэш засечек указанного размера. Это можно сделать только один раз.
|
||||
void setMarkCache(size_t cache_size_in_bytes);
|
||||
MarkCachePtr getMarkCache() const;
|
||||
|
@ -5,16 +5,22 @@
|
||||
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
|
||||
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
||||
#include <zkutil/ZooKeeper.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Движок, использующий merge-дерево и реплицируемый через ZooKeeper.
|
||||
*/
|
||||
class StorageReplicatedMergeTree : public IStorage
|
||||
{
|
||||
public:
|
||||
/** Если !attach, либо создает новую таблицу в ZK, либо добавляет реплику в существующую таблицу.
|
||||
*/
|
||||
static StoragePtr create(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
bool attach,
|
||||
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
|
||||
const Context & context_,
|
||||
ASTPtr & primary_expr_ast_,
|
||||
@ -28,18 +34,18 @@ public:
|
||||
void shutdown();
|
||||
~StorageReplicatedMergeTree();
|
||||
|
||||
std::string getName() const
|
||||
std::string getName() const override
|
||||
{
|
||||
return "Replicated" + data.getModePrefix() + "MergeTree";
|
||||
}
|
||||
|
||||
std::string getTableName() const { return name; }
|
||||
std::string getTableName() const override { return name; }
|
||||
std::string getSignColumnName() const { return data.getSignColumnName(); }
|
||||
bool supportsSampling() const { return data.supportsSampling(); }
|
||||
bool supportsFinal() const { return data.supportsFinal(); }
|
||||
bool supportsPrewhere() const { return data.supportsPrewhere(); }
|
||||
bool supportsSampling() const override { return data.supportsSampling(); }
|
||||
bool supportsFinal() const override { return data.supportsFinal(); }
|
||||
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
|
||||
|
||||
const NamesAndTypesList & getColumnsList() const { return data.getColumnsList(); }
|
||||
const NamesAndTypesList & getColumnsList() const override { return data.getColumnsList(); }
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
@ -47,10 +53,12 @@ public:
|
||||
const Settings & settings,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size = DEFAULT_BLOCK_SIZE,
|
||||
unsigned threads = 1);
|
||||
unsigned threads = 1) override;
|
||||
|
||||
BlockOutputStreamPtr write(ASTPtr query);
|
||||
BlockOutputStreamPtr write(ASTPtr query) override;
|
||||
|
||||
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
|
||||
*/
|
||||
void drop() override;
|
||||
|
||||
private:
|
||||
@ -71,7 +79,7 @@ private:
|
||||
|
||||
typedef std::list<LogEntry> LogEntries;
|
||||
|
||||
/** "Очередь" того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из Zookeeper (/replicas/me/queue/).
|
||||
/** "Очередь" того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/).
|
||||
* В ZK записи в хронологическом порядке. Здесь записи в том порядке, в котором их лучше выполнять.
|
||||
*/
|
||||
LogEntries queue;
|
||||
@ -84,14 +92,16 @@ private:
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
|
||||
/** Является ли эта реплика "главной". Главная реплика выбирает куски для слияния.
|
||||
/** Является ли эта реплика "ведущей". Ведущая реплика выбирает куски для слияния.
|
||||
*/
|
||||
bool is_master_node;
|
||||
bool is_leader_node;
|
||||
|
||||
MergeTreeData data;
|
||||
MergeTreeDataSelectExecutor reader;
|
||||
MergeTreeDataWriter writer;
|
||||
|
||||
zkutil::ZooKeeper * zookeeper;
|
||||
|
||||
Logger * log;
|
||||
|
||||
volatile bool shutdown_called;
|
||||
@ -99,6 +109,7 @@ private:
|
||||
StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
bool attach,
|
||||
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
|
||||
const Context & context_,
|
||||
ASTPtr & primary_expr_ast_,
|
||||
@ -109,6 +120,17 @@ private:
|
||||
const String & sign_column_ = "",
|
||||
const MergeTreeSettings & settings_ = MergeTreeSettings());
|
||||
|
||||
/// Инициализация.
|
||||
|
||||
bool isTableExistsInZooKeeper();
|
||||
bool isTableEmptyInZooKeeper();
|
||||
|
||||
void createNewTableInZooKeeper();
|
||||
void createNewReplicaInZooKeeper();
|
||||
|
||||
void removeReplicaInZooKeeper();
|
||||
void removeTableInZooKeeper();
|
||||
|
||||
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
|
||||
* Если нет - бросить исключение.
|
||||
*/
|
||||
@ -121,7 +143,9 @@ private:
|
||||
*/
|
||||
void checkParts();
|
||||
|
||||
/** Кладет в queue записи из Zookeeper (/replicas/me/queue/).
|
||||
/// Работа с очередью и логом.
|
||||
|
||||
/** Кладет в queue записи из ZooKeeper (/replicas/me/queue/).
|
||||
*/
|
||||
void loadQueue();
|
||||
|
||||
@ -145,6 +169,14 @@ private:
|
||||
* - Не смогли объединить куски, потому что не все из них у нас есть.
|
||||
*/
|
||||
bool tryExecute(const LogEntry & entry);
|
||||
|
||||
/// Обмен кусками.
|
||||
|
||||
void registerEndpoint();
|
||||
void unregisterEndpoint();
|
||||
|
||||
String findReplicaHavingPart(const String & part_name);
|
||||
void getPart(const String & name, const String & replica_name);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -474,6 +474,22 @@ MarkCachePtr Context::getMarkCache() const
|
||||
return shared->mark_cache;
|
||||
}
|
||||
|
||||
void Context::setZooKeeper(SharedPtr<zkutil::ZooKeeper> zookeeper)
|
||||
{
|
||||
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
|
||||
|
||||
if (shared->zookeeper)
|
||||
throw Exception("ZooKeeper client has already been set.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
shared->zookeeper = zookeeper;
|
||||
}
|
||||
|
||||
zkutil::ZooKeeper * Context::getZooKeeper() const
|
||||
{
|
||||
return shared->zookeeper.get();
|
||||
}
|
||||
|
||||
|
||||
void Context::initClusters()
|
||||
{
|
||||
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
|
||||
|
@ -213,6 +213,9 @@ int Server::main(const std::vector<std::string> & args)
|
||||
global_context->setGlobalContext(*global_context);
|
||||
global_context->setPath(config.getString("path"));
|
||||
|
||||
if (config.has("zookeeper"))
|
||||
global_context->setZooKeeper(new zkutil::ZooKeeper(config, "zookeeper"));
|
||||
|
||||
std::string users_config_path = config.getString("users_config", config.getString("config-file", "config.xml"));
|
||||
users_config_reloader = new UsersConfigReloader(users_config_path, global_context);
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <DB/Storages/StorageChunks.h>
|
||||
#include <DB/Storages/StorageChunkRef.h>
|
||||
#include <DB/Storages/StorageChunkMerger.h>
|
||||
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
||||
|
||||
#include <DB/DataTypes/DataTypeArray.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
@ -29,6 +30,15 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static bool endsWith(const std::string & s, const std::string & suffix)
|
||||
{
|
||||
return s.size() >= suffix.size() && s.substr(s.size() - suffix.size()) == suffix;
|
||||
}
|
||||
|
||||
static bool startsWith(const std::string & s, const std::string & prefix)
|
||||
{
|
||||
return s.size() >= prefix.size() && s.substr(0, prefix.size()) == prefix;
|
||||
}
|
||||
|
||||
/** Для StorageMergeTree: достать первичный ключ в виде ASTExpressionList.
|
||||
* Он может быть указан в кортеже: (CounterID, Date),
|
||||
@ -176,80 +186,121 @@ StoragePtr StorageFactory::get(
|
||||
return StorageDistributed::create(table_name, columns, remote_database, remote_table, cluster_name,
|
||||
context, sign_column_name);
|
||||
}
|
||||
else if (name == "MergeTree" || name == "SummingMergeTree")
|
||||
else if (endsWith(name, "MergeTree"))
|
||||
{
|
||||
/** В качестве аргумента для движка должно быть указано:
|
||||
/** Движки [Replicated][Summing|Collapsing]MergeTree (6 комбинаций)
|
||||
* В качестве аргумента для движка должно быть указано:
|
||||
* - (для Replicated) Путь к таблице в ZooKeeper
|
||||
* - (для Replicated) Имя реплики в ZooKeeper
|
||||
* - имя столбца с датой;
|
||||
* - имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
|
||||
* - выражение для сортировки в скобках;
|
||||
* - index_granularity.
|
||||
* Например: ENGINE = MergeTree(EventDate, intHash32(UniqID), (CounterID, EventDate, intHash32(UniqID), EventTime), 8192).
|
||||
*
|
||||
* SummingMergeTree - вариант, в котором при слиянии делается суммирование всех числовых столбцов кроме PK
|
||||
* - для Баннерной Крутилки.
|
||||
*/
|
||||
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
|
||||
|
||||
if (args_func.size() != 1)
|
||||
throw Exception("Storage " + name + " requires 3 or 4 parameters"
|
||||
" - name of column with date, [name of column for sampling], primary key expression, index granularity.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
|
||||
|
||||
if (args.size() != 3 && args.size() != 4)
|
||||
throw Exception("Storage " + name + " requires 3 or 4 parameters"
|
||||
" - name of column with date, [name of column for sampling], primary key expression, index granularity.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
size_t arg_offset = args.size() - 3;
|
||||
|
||||
String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name;
|
||||
ASTPtr sampling_expression = arg_offset == 0 ? NULL : args[1];
|
||||
UInt64 index_granularity = safeGet<UInt64>(dynamic_cast<ASTLiteral &>(*args[arg_offset + 2]).value);
|
||||
|
||||
ASTPtr primary_expr_list = extractPrimaryKey(args[arg_offset + 1], name);
|
||||
|
||||
return StorageMergeTree::create(
|
||||
data_path, table_name, columns, context, primary_expr_list, date_column_name, sampling_expression, index_granularity,
|
||||
name == "SummingMergeTree" ? MergeTreeData::Summing : MergeTreeData::Ordinary);
|
||||
}
|
||||
else if (name == "CollapsingMergeTree")
|
||||
{
|
||||
/** В качестве аргумента для движка должно быть указано:
|
||||
* - имя столбца с датой;
|
||||
* - имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
|
||||
* - выражение для сортировки в скобках;
|
||||
* - (не обязательно) имя столбца для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
|
||||
* - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких);
|
||||
* - index_granularity;
|
||||
* - имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
|
||||
* Например: ENGINE = CollapsingMergeTree(EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
|
||||
* - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
|
||||
* Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
|
||||
*/
|
||||
|
||||
String name_part = name.substr(0, name.size() - strlen("MergeTree"));
|
||||
|
||||
bool replicated = startsWith(name_part, "Replicated");
|
||||
if (replicated)
|
||||
name_part = name_part.substr(strlen("Replicated"));
|
||||
|
||||
MergeTreeData::Mode mode = MergeTreeData::Ordinary;
|
||||
|
||||
if (name_part == "Collapsing")
|
||||
mode = MergeTreeData::Collapsing;
|
||||
else if (name_part == "Summing")
|
||||
mode = MergeTreeData::Summing;
|
||||
else if (!name_part.empty())
|
||||
throw Exception("Unknown storage " + name, ErrorCodes::UNKNOWN_STORAGE);
|
||||
|
||||
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
|
||||
|
||||
if (args_func.size() != 1)
|
||||
throw Exception("Storage CollapsingMergeTree requires 4 or 5 parameters"
|
||||
" - name of column with date, [name of column for sampling], primary key expression, index granularity, sign_column.",
|
||||
throw Exception("Unexpected AST structure.",
|
||||
ErrorCodes::UNEXPECTED_AST_STRUCTURE);
|
||||
|
||||
ASTs args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
|
||||
|
||||
size_t additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing ? 1 : 0);
|
||||
if (args.size() != additional_params + 3 && args.size() != additional_params + 4)
|
||||
{
|
||||
String params;
|
||||
if (replicated)
|
||||
params += "path in ZooKeeper, replica name, ";
|
||||
params += "name of column with date, [name of column for sampling], primary key expression, index granularity";
|
||||
if (mode == MergeTreeData::Collapsing)
|
||||
params += "sign column";
|
||||
throw Exception("Storage CollapsingMergeTree requires " + toString(additional_params + 3) + " or "
|
||||
+ toString(additional_params + 4) +" parameters: " + params,
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
}
|
||||
|
||||
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
|
||||
if (args.size() != 4 && args.size() != 5)
|
||||
throw Exception("Storage CollapsingMergeTree requires 4 or 5 parameters"
|
||||
" - name of column with date, [name of column for sampling], primary key expression, index granularity, sign_column.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
String date_column_name;
|
||||
ASTPtr primary_expr_list;
|
||||
ASTPtr sampling_expression;
|
||||
UInt64 index_granularity;
|
||||
|
||||
size_t arg_offset = args.size() - 4;
|
||||
|
||||
String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name;
|
||||
ASTPtr sampling_expression = arg_offset == 0 ? NULL : args[1];
|
||||
UInt64 index_granularity = safeGet<UInt64>(dynamic_cast<ASTLiteral &>(*args[arg_offset + 2]).value);
|
||||
String sign_column_name = dynamic_cast<ASTIdentifier &>(*args[arg_offset + 3]).name;
|
||||
String sign_column_name;
|
||||
|
||||
ASTPtr primary_expr_list = extractPrimaryKey(args[arg_offset + 1], name);
|
||||
if (replicated)
|
||||
{
|
||||
auto ast = dynamic_cast<ASTLiteral *>(&*args[0]);
|
||||
if (ast && ast->value.getType() == Field::Types::String)
|
||||
zookeeper_path = safeGet<String>(ast->value);
|
||||
else
|
||||
throw Exception("Path in ZooKeeper must be a string literal", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
return StorageMergeTree::create(
|
||||
data_path, table_name, columns, context, primary_expr_list, date_column_name,
|
||||
sampling_expression, index_granularity, MergeTreeData::Collapsing, sign_column_name);
|
||||
ast = dynamic_cast<ASTLiteral *>(&*args[1]);
|
||||
if (ast && ast->value.getType() == Field::Types::String)
|
||||
replica_name = safeGet<String>(ast->value);
|
||||
else
|
||||
throw Exception("Replica name must be a string literal", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
args.erase(args.begin(), args.begin() + 2);
|
||||
}
|
||||
|
||||
if (mode == MergeTreeData::Collapsing)
|
||||
{
|
||||
if (auto ast = dynamic_cast<ASTIdentifier *>(&*args.back()))
|
||||
sign_column_name = ast->name;
|
||||
else
|
||||
throw Exception("Sign column name must be an unquoted string", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
args.pop_back();
|
||||
}
|
||||
|
||||
if (args.size() == 4)
|
||||
{
|
||||
sampling_expression = args[1];
|
||||
args.erase(args.begin() + 1);
|
||||
}
|
||||
|
||||
if (auto ast = dynamic_cast<ASTIdentifier *>(&*args[0]))
|
||||
date_column_name = ast->name;
|
||||
else
|
||||
throw Exception("Date column name must be an unquoted string", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
primary_expr_list = extractPrimaryKey(args[1], name);
|
||||
|
||||
auto ast = dynamic_cast<ASTLiteral *>(&*args[2]);
|
||||
if (ast && ast->value.getType() == Field::Types::UInt64)
|
||||
index_granularity = safeGet<UInt64>(ast->value);
|
||||
else
|
||||
throw Exception("Index granularity must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (replicated)
|
||||
return StorageReplicatedMergeTree::create(zookeeper_path, replica_name, attach, data_path, table_name,
|
||||
columns, context, primary_expr_list, date_column_name,
|
||||
sampling_expression, index_granularity, mode, sign_column_name);
|
||||
else
|
||||
return StorageMergeTree::create(data_path, table_name,
|
||||
columns, context, primary_expr_list, date_column_name,
|
||||
sampling_expression, index_granularity, mode, sign_column_name);
|
||||
}
|
||||
else if (name == "SystemNumbers")
|
||||
{
|
||||
|
@ -6,6 +6,7 @@ namespace DB
|
||||
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
bool attach,
|
||||
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
|
||||
const Context & context_,
|
||||
ASTPtr & primary_expr_ast_,
|
||||
@ -21,15 +22,34 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
|
||||
index_granularity_,mode_, sign_column_, settings_),
|
||||
reader(data), writer(data),
|
||||
log(&Logger::get("StorageReplicatedMergeTree")),
|
||||
zookeeper(context_.getZooKeeper()), log(&Logger::get("StorageReplicatedMergeTree")),
|
||||
shutdown_called(false)
|
||||
{
|
||||
|
||||
if (!attach)
|
||||
{
|
||||
if (isTableExistsInZooKeeper())
|
||||
{
|
||||
if (!isTableEmptyInZooKeeper())
|
||||
throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);
|
||||
checkTableStructure();
|
||||
createNewReplicaInZooKeeper();
|
||||
}
|
||||
else
|
||||
{
|
||||
createNewTableInZooKeeper();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
checkTableStructure();
|
||||
checkParts();
|
||||
}
|
||||
}
|
||||
|
||||
StoragePtr StorageReplicatedMergeTree::create(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
bool attach,
|
||||
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
|
||||
const Context & context_,
|
||||
ASTPtr & primary_expr_ast_,
|
||||
@ -40,8 +60,85 @@ StoragePtr StorageReplicatedMergeTree::create(
|
||||
const String & sign_column_,
|
||||
const MergeTreeSettings & settings_)
|
||||
{
|
||||
return (new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, path_, name_, columns_, context_, primary_expr_ast_,
|
||||
return (new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach, path_, name_, columns_, context_, primary_expr_ast_,
|
||||
date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_))->thisPtr();
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::shutdown() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
BlockInputStreams StorageReplicatedMergeTree::read(
|
||||
const Names & column_names,
|
||||
ASTPtr query,
|
||||
const Settings & settings,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned threads) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::drop() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
bool StorageReplicatedMergeTree::isTableExistsInZooKeeper() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
bool StorageReplicatedMergeTree::isTableEmptyInZooKeeper() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
void StorageReplicatedMergeTree::createNewTableInZooKeeper() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
void StorageReplicatedMergeTree::createNewReplicaInZooKeeper() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
void StorageReplicatedMergeTree::removeReplicaInZooKeeper() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
void StorageReplicatedMergeTree::removeTableInZooKeeper() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
|
||||
* Если нет - бросить исключение.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::checkTableStructure() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/** Проверить, что множество кусков соответствует тому, что в ZK (/replicas/me/parts/).
|
||||
* Если каких-то кусков, описанных в ZK нет локально, бросить исключение.
|
||||
* Если какие-то локальные куски не упоминаются в ZK, удалить их.
|
||||
* Но если таких слишком много, на всякий случай бросить исключение - скорее всего, это ошибка конфигурации.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::checkParts() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/// Работа с очередью и логом.
|
||||
|
||||
/** Кладет в queue записи из ZooKeeper (/replicas/me/queue/).
|
||||
*/
|
||||
void StorageReplicatedMergeTree::loadQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/** Копирует новые записи из логов всех реплик в очередь этой реплики.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::pullLogsToQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/** Делает преобразования над очередью:
|
||||
* - Если есть MERGE_PARTS кусков, не все из которых у нас есть, заменяем его на GET_PART и
|
||||
* убираем GET_PART для всех составляющих его кусков. NOTE: Наверно, это будет плохо работать. Придумать эвристики получше.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::optimizeQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/** По порядку пытается выполнить действия из очереди, пока не получится. Что получилось, выбрасывает из очереди.
|
||||
*/
|
||||
void StorageReplicatedMergeTree::executeSomeQueueEntry() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/** Попробовать выполнить действие из очереди. Возвращает false, если не получилось по какой-то ожидаемой причине:
|
||||
* - GET_PART, и ни у кого нет этого куска. Это возможно, если этот кусок уже слили с кем-то и удалили.
|
||||
* - Не смогли скачать у кого-то кусок, потому что его там уже нет.
|
||||
* - Не смогли объединить куски, потому что не все из них у нас есть.
|
||||
*/
|
||||
bool StorageReplicatedMergeTree::tryExecute(const LogEntry & entry) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
/// Обмен кусками.
|
||||
|
||||
void StorageReplicatedMergeTree::registerEndpoint() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
void StorageReplicatedMergeTree::unregisterEndpoint() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
void StorageReplicatedMergeTree::getPart(const String & name, const String & replica_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
|
||||
|
||||
}
|
||||
|
@ -68,7 +68,6 @@ struct ZooKeeperArgs
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_name, keys);
|
||||
std::string node_key = "node";
|
||||
std::string node_key_ext = "node[";
|
||||
|
||||
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
|
||||
for (const auto & key : keys)
|
||||
|
Loading…
Reference in New Issue
Block a user