ClickHouse/dbms/src/Storages/StorageReplicatedMergeTree.cpp

359 lines
12 KiB
C++
Raw Normal View History

2014-03-21 13:42:14 +00:00
#include <DB/Storages/StorageReplicatedMergeTree.h>
2014-04-02 10:10:37 +00:00
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
2014-03-22 14:44:44 +00:00
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
2014-03-21 13:42:14 +00:00
namespace DB
{
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::MyInterserverIOEndpoint::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out)
{
writeString("Hello. You requested part ", out);
writeString(params.get("part"), out);
writeString(".", out);
}
2014-03-21 13:42:14 +00:00
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
2014-03-21 19:17:59 +00:00
bool attach,
2014-03-21 13:42:14 +00:00
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
2014-03-22 14:44:44 +00:00
Context & context_,
2014-03-21 13:42:14 +00:00
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_)
:
2014-03-22 14:44:44 +00:00
context(context_), zookeeper(context.getZooKeeper()),
2014-03-21 13:42:14 +00:00
path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), zookeeper_path(zookeeper_path_),
replica_name(replica_name_),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_),
reader(data), writer(data),
2014-03-22 14:44:44 +00:00
log(&Logger::get("StorageReplicatedMergeTree")),
2014-03-21 13:42:14 +00:00
shutdown_called(false)
{
2014-03-22 14:44:44 +00:00
if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
zookeeper_path.erase(zookeeper_path.end() - 1);
replica_path = zookeeper_path + "/replicas/" + replica_name;
2014-03-21 19:17:59 +00:00
if (!attach)
{
2014-03-22 14:44:44 +00:00
if (!zookeeper.exists(zookeeper_path))
createTable();
if (!isTableEmpty())
throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);
checkTableStructure();
createReplica();
2014-03-21 19:17:59 +00:00
}
else
{
checkTableStructure();
2014-04-02 07:59:43 +00:00
checkParts();
2014-03-21 19:17:59 +00:00
}
2014-03-22 14:44:44 +00:00
String endpoint_name = "ReplicatedMergeTree:" + replica_path;
InterserverIOEndpointPtr endpoint = new MyInterserverIOEndpoint(*this);
endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, context.getInterserverIOHandler());
activateReplica();
2014-03-21 13:42:14 +00:00
}
StoragePtr StorageReplicatedMergeTree::create(
const String & zookeeper_path_,
const String & replica_name_,
2014-03-21 19:17:59 +00:00
bool attach,
2014-03-21 13:42:14 +00:00
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
2014-03-22 14:44:44 +00:00
Context & context_,
2014-03-21 13:42:14 +00:00
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_)
{
2014-04-02 07:59:43 +00:00
StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
dynamic_cast<MyInterserverIOEndpoint &>(*res->endpoint_holder->getEndpoint()).setOwnedStorage(res_ptr);
return res_ptr;
2014-03-21 13:42:14 +00:00
}
2014-03-22 14:44:44 +00:00
static String formattedAST(const ASTPtr & ast)
{
if (!ast)
return "";
std::stringstream ss;
formatAST(*ast, ss, 0, false, true);
return ss.str();
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::createTable()
2014-03-21 19:17:59 +00:00
{
2014-03-22 14:44:44 +00:00
zookeeper.create(zookeeper_path, "", zkutil::CreateMode::Persistent);
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
std::stringstream metadata;
metadata << "metadata format version: 1" << std::endl;
metadata << "date column: " << data.date_column_name << std::endl;
metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
metadata << "index granularity: " << data.index_granularity << std::endl;
metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
metadata << "sign column: " << data.sign_column << std::endl;
metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
metadata << "columns:" << std::endl;
WriteBufferFromOStream buf(metadata);
for (auto & it : data.getColumnsList())
{
writeBackQuotedString(it.first, buf);
writeChar(' ', buf);
writeString(it.second->getName(), buf);
writeChar('\n', buf);
}
buf.next();
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
zookeeper.create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
/// Создадим нужные "директории".
zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper.create(zookeeper_path + "/block-numbers", "", zkutil::CreateMode::Persistent);
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
* Если нет - бросить исключение.
2014-03-21 19:17:59 +00:00
*/
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::checkTableStructure()
{
String metadata_str = zookeeper.get(zookeeper_path + "/metadata");
ReadBufferFromString buf(metadata_str);
assertString("metadata format version: 1", buf);
assertString("\ndate column: ", buf);
assertString(data.date_column_name, buf);
assertString("\nsampling expression: ", buf);
assertString(formattedAST(data.sampling_expression), buf);
assertString("\nindex granularity: ", buf);
assertString(toString(data.index_granularity), buf);
assertString("\nmode: ", buf);
assertString(toString(static_cast<int>(data.mode)), buf);
assertString("\nsign column: ", buf);
assertString(data.sign_column, buf);
assertString("\nprimary key: ", buf);
assertString(formattedAST(data.primary_expr_ast), buf);
assertString("\ncolumns:\n", buf);
for (auto & it : data.getColumnsList())
{
String name;
readBackQuotedString(name, buf);
if (name != it.first)
throw Exception("Unexpected column name in ZooKeeper: expected " + it.first + ", found " + name,
ErrorCodes::UNKNOWN_IDENTIFIER);
assertString(" ", buf);
assertString(it.second->getName(), buf);
assertString("\n", buf);
}
2014-04-02 07:59:43 +00:00
assertEOF(buf);
2014-03-22 14:44:44 +00:00
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::createReplica()
{
zookeeper.create(replica_path, "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
zookeeper.create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::activateReplica()
{
std::stringstream host;
host << "host: " << context.getInterserverIOHost() << std::endl;
host << "port: " << context.getInterserverIOPort() << std::endl;
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
/// Одновременно объявим, что эта реплика активна и обновим хост.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper.getDefaultACL(), zkutil::CreateMode::Ephemeral));
ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
zookeeper.multi(ops);
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", zookeeper);
}
2014-03-21 19:17:59 +00:00
2014-03-22 14:44:44 +00:00
bool StorageReplicatedMergeTree::isTableEmpty()
{
Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
for (const auto & replica : replicas)
{
if (!zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
return false;
}
return true;
}
2014-03-21 19:17:59 +00:00
2014-04-02 07:59:43 +00:00
void StorageReplicatedMergeTree::checkParts()
{
Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
MergeTreeData::DataParts parts = data.getDataParts();
MergeTreeData::DataPartsVector unexpected_parts;
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
{
expected_parts.erase(part->name);
}
else
{
unexpected_parts.push_back(part);
}
}
if (!expected_parts.empty())
throw Exception("Not found " + toString(expected_parts.size())
2014-04-02 10:10:37 +00:00
+ " parts (including " + *expected_parts.begin() + ") in table " + data.getTableName(),
2014-04-02 07:59:43 +00:00
ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
if (unexpected_parts.size() > 1)
throw Exception("More than one unexpected part (including " + unexpected_parts[0]->name
+ ") in table " + data.getTableName(),
ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
for (MergeTreeData::DataPartPtr part : unexpected_parts)
{
LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name);
2014-04-02 10:10:37 +00:00
data.renameAndDetachPart(part, "ignored_");
2014-04-02 07:59:43 +00:00
}
}
2014-03-21 19:17:59 +00:00
void StorageReplicatedMergeTree::loadQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
void StorageReplicatedMergeTree::pullLogsToQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
void StorageReplicatedMergeTree::optimizeQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
void StorageReplicatedMergeTree::executeSomeQueueEntry() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
bool StorageReplicatedMergeTree::tryExecute(const LogEntry & entry) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
2014-04-02 07:59:43 +00:00
2014-03-21 19:17:59 +00:00
void StorageReplicatedMergeTree::getPart(const String & name, const String & replica_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::shutdown()
{
if (shutdown_called)
return;
shutdown_called = true;
replica_is_active_node = nullptr;
endpoint_holder = nullptr;
/// Кажется, чтобы был невозможен дедлок, тут придется дождаться удаления MyInterserverIOEndpoint.
}
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
try
{
shutdown();
}
catch(...)
{
tryLogCurrentException("~StorageReplicatedMergeTree");
}
}
BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
}
2014-04-02 10:10:37 +00:00
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
String insert_id;
if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*query))
insert_id = insert->insert_id;
return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
2014-03-22 14:44:44 +00:00
void StorageReplicatedMergeTree::drop()
{
replica_is_active_node = nullptr;
zookeeper.removeRecursive(replica_path);
if (zookeeper.getChildren(zookeeper_path + "/replicas").empty())
zookeeper.removeRecursive(zookeeper_path);
}
2014-04-02 10:10:37 +00:00
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);
switch (type)
{
case GET_PART:
writeString("get\n", out);
writeString(new_part_name, out);
break;
case MERGE_PARTS:
writeString("merge\n", out);
for (const String & s : parts_to_merge)
{
writeString(s, out);
writeString("\n", out);
}
writeString("into\n", out);
writeString(new_part_name, out);
break;
}
writeString("\n", out);
}
void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
String type_str;
assertString("format version: 1\n", in);
readString(type_str, in);
assertString("\n", in);
if (type_str == "get")
{
type = GET_PART;
readString(new_part_name, in);
}
else if (type_str == "merge")
{
type = MERGE_PARTS;
while (true)
{
String s;
readString(s, in);
assertString("\n", in);
if (s == "into")
break;
parts_to_merge.push_back(s);
}
readString(new_part_name, in);
}
assertString("\n", in);
}
2014-03-21 13:42:14 +00:00
}