2020-04-05 12:18:51 +00:00
|
|
|
#include <iomanip>
|
|
|
|
|
|
|
|
#include <Core/Settings.h>
|
|
|
|
#include <Databases/DatabaseOnDisk.h>
|
2020-05-05 14:16:59 +00:00
|
|
|
#include <Databases/DatabaseOrdinary.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
#include <Databases/DatabaseReplicated.h>
|
|
|
|
#include <Databases/DatabasesCommon.h>
|
|
|
|
#include <IO/ReadBufferFromFile.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <IO/ReadBufferFromString.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/WriteBufferFromFile.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <Interpreters/Context.h>
|
|
|
|
#include <Interpreters/InterpreterCreateQuery.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <Interpreters/executeQuery.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
#include <Parsers/ASTCreateQuery.h>
|
|
|
|
#include <Parsers/ASTSetQuery.h>
|
|
|
|
#include <Parsers/ParserCreateQuery.h>
|
|
|
|
#include <Parsers/formatAST.h>
|
|
|
|
#include <Parsers/parseQuery.h>
|
|
|
|
#include <Storages/StorageFactory.h>
|
|
|
|
#include <TableFunctions/TableFunctionFactory.h>
|
|
|
|
|
|
|
|
#include <Parsers/queryToString.h>
|
|
|
|
|
|
|
|
#include <Poco/DirectoryIterator.h>
|
|
|
|
#include <Poco/Event.h>
|
|
|
|
#include <Common/Stopwatch.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <Common/setThreadName.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
#include <Common/ThreadPool.h>
|
|
|
|
#include <Common/escapeForFileName.h>
|
|
|
|
#include <Common/quoteString.h>
|
|
|
|
#include <Common/typeid_cast.h>
|
|
|
|
#include <common/logger_useful.h>
|
|
|
|
|
|
|
|
#include <Common/ZooKeeper/KeeperException.h>
|
|
|
|
#include <Common/ZooKeeper/Types.h>
|
|
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <Common/ZooKeeper/Lock.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
|
|
|
|
#include <ext/scope_guard.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <common/sleep.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int NO_ZOOKEEPER;
|
|
|
|
}
|
|
|
|
|
|
|
|
void DatabaseReplicated::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(current_zookeeper_mutex);
|
|
|
|
current_zookeeper = zookeeper;
|
|
|
|
}
|
|
|
|
|
|
|
|
zkutil::ZooKeeperPtr DatabaseReplicated::tryGetZooKeeper() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock(current_zookeeper_mutex);
|
|
|
|
return current_zookeeper;
|
|
|
|
}
|
|
|
|
|
|
|
|
zkutil::ZooKeeperPtr DatabaseReplicated::getZooKeeper() const
|
|
|
|
{
|
|
|
|
auto res = tryGetZooKeeper();
|
|
|
|
if (!res)
|
|
|
|
throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
DatabaseReplicated::DatabaseReplicated(
|
|
|
|
const String & name_,
|
|
|
|
const String & metadata_path_,
|
|
|
|
const String & zookeeper_path_,
|
|
|
|
const String & replica_name_,
|
2020-05-05 14:16:59 +00:00
|
|
|
Context & context_)
|
|
|
|
// : DatabaseOrdinary(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseReplicated (" + name_ + ")", context_)
|
|
|
|
// TODO add constructor to Atomic and call it here with path and logger name specification
|
|
|
|
// TODO ask why const and & are ommited in Atomic
|
2020-05-11 12:55:17 +00:00
|
|
|
: DatabaseOrdinary(name_, metadata_path_, context_)
|
|
|
|
, context(context_)
|
2020-04-05 12:18:51 +00:00
|
|
|
, zookeeper_path(zookeeper_path_)
|
|
|
|
, replica_name(replica_name_)
|
|
|
|
{
|
|
|
|
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
|
|
|
|
zookeeper_path.resize(zookeeper_path.size() - 1);
|
2020-05-01 13:16:02 +00:00
|
|
|
// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
|
2020-04-05 12:18:51 +00:00
|
|
|
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
|
|
|
|
zookeeper_path = "/" + zookeeper_path;
|
2020-05-01 13:16:02 +00:00
|
|
|
|
2020-04-05 12:18:51 +00:00
|
|
|
replica_path = zookeeper_path + "/replicas/" + replica_name;
|
|
|
|
|
|
|
|
if (context_.hasZooKeeper()) {
|
|
|
|
current_zookeeper = context_.getZooKeeper();
|
|
|
|
}
|
|
|
|
if (!current_zookeeper)
|
|
|
|
{
|
2020-04-30 16:15:27 +00:00
|
|
|
throw Exception("Can't create replicated database without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
}
|
2020-04-05 12:18:51 +00:00
|
|
|
|
2020-05-05 14:16:59 +00:00
|
|
|
current_zookeeper->createAncestors(zookeeper_path);
|
|
|
|
current_zookeeper->createOrUpdate(zookeeper_path, String(), zkutil::CreateMode::Persistent);
|
2020-04-05 12:18:51 +00:00
|
|
|
|
2020-05-11 12:55:17 +00:00
|
|
|
// TODO if no last_entry then make it equal to 0 in zk;
|
|
|
|
|
2020-05-05 14:16:59 +00:00
|
|
|
// TODO launch a worker here
|
2020-05-11 12:55:17 +00:00
|
|
|
main_thread = ThreadFromGlobalPool(&DatabaseReplicated::runMainThread, this);
|
|
|
|
}
|
|
|
|
|
|
|
|
DatabaseReplicated::~DatabaseReplicated()
|
|
|
|
{
|
|
|
|
stop_flag = true;
|
|
|
|
main_thread.join();
|
|
|
|
}
|
|
|
|
|
|
|
|
void DatabaseReplicated::runMainThread() {
|
|
|
|
setThreadName("ReplctdWorker"); // ok whatever. 15 bytes // + database_name);
|
|
|
|
LOG_DEBUG(log, "Started " << database_name << " database worker thread\n Replica: " << replica_name);
|
|
|
|
|
|
|
|
while (!stop_flag) {
|
|
|
|
attachToThreadGroup();
|
|
|
|
|
2020-05-11 13:31:14 +00:00
|
|
|
sleepForSeconds(2);
|
2020-05-11 12:55:17 +00:00
|
|
|
current_zookeeper = getZooKeeper();
|
2020-05-11 13:31:14 +00:00
|
|
|
String last_n;
|
|
|
|
if (!current_zookeeper->tryGet(zookeeper_path + "/last_entry", last_n, {}, NULL)) {
|
|
|
|
continue;
|
|
|
|
}
|
2020-05-11 12:55:17 +00:00
|
|
|
size_t last_n_parsed = parse<size_t>(last_n);
|
2020-05-11 13:31:14 +00:00
|
|
|
LOG_DEBUG(log, "PARSED " << last_n_parsed);
|
|
|
|
LOG_DEBUG(log, "LOCAL CURRENT " << current_log_entry_n);
|
2020-05-11 12:55:17 +00:00
|
|
|
while (current_log_entry_n < last_n_parsed) {
|
|
|
|
current_log_entry_n++;
|
|
|
|
executeLog(current_log_entry_n);
|
|
|
|
}
|
2020-05-11 13:31:14 +00:00
|
|
|
// break; // debug purpose
|
2020-05-11 12:55:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void DatabaseReplicated::executeLog(size_t n) {
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "EXECUTING LOG! DB: " << database_name << "\n Replica: " << replica_name << "LOG N" << n);
|
|
|
|
current_context = std::make_unique<Context>(context);
|
|
|
|
current_context->from_replicated_log = true;
|
|
|
|
current_context->setCurrentQueryId(""); // generate random query_id
|
|
|
|
current_zookeeper = getZooKeeper();
|
|
|
|
|
|
|
|
String query_to_execute = current_zookeeper->get(zookeeper_path + "/log." + std::to_string(n), {}, NULL);
|
|
|
|
ReadBufferFromString istr(query_to_execute);
|
|
|
|
String dummy_string;
|
|
|
|
WriteBufferFromString ostr(dummy_string);
|
|
|
|
executeQuery(istr, ostr, false, context, {});
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO we might not need it here at all
|
|
|
|
void DatabaseReplicated::attachToThreadGroup() {
|
|
|
|
if (thread_group)
|
|
|
|
{
|
|
|
|
/// Put all threads to one thread pool
|
|
|
|
CurrentThread::attachToIfDetached(thread_group);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
CurrentThread::initializeQuery();
|
|
|
|
thread_group = CurrentThread::getGroup();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// taken from ddlworker
|
|
|
|
static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
|
|
|
|
const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
|
|
|
|
{
|
|
|
|
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
|
|
|
|
zookeeper_holder->initFromInstance(zookeeper);
|
|
|
|
return std::make_unique<zkutil::Lock>(std::move(zookeeper_holder), lock_prefix, lock_name, lock_message);
|
2020-04-05 12:18:51 +00:00
|
|
|
}
|
|
|
|
|
2020-04-30 16:15:27 +00:00
|
|
|
|
2020-05-05 14:16:59 +00:00
|
|
|
void DatabaseReplicated::propose(const ASTPtr & query) {
|
2020-05-11 12:55:17 +00:00
|
|
|
// TODO if source is zk then omit propose. Throw?
|
|
|
|
|
|
|
|
// TODO remove that log message i think
|
2020-05-05 14:16:59 +00:00
|
|
|
LOG_DEBUG(log, "PROPOSING\n" << queryToString(query));
|
2020-04-05 12:18:51 +00:00
|
|
|
|
2020-05-11 12:55:17 +00:00
|
|
|
current_zookeeper = getZooKeeper();
|
|
|
|
auto lock = createSimpleZooKeeperLock(current_zookeeper, zookeeper_path, "lock", replica_name);
|
|
|
|
|
|
|
|
// TODO check that last_entry is the same as current_log_entry_n for the replica
|
|
|
|
|
|
|
|
current_log_entry_n++; // starting from 1
|
|
|
|
String log_entry = zookeeper_path + "/log." + std::to_string(current_log_entry_n);
|
|
|
|
current_zookeeper->createOrUpdate(log_entry, queryToString(query), zkutil::CreateMode::Persistent);
|
|
|
|
|
|
|
|
current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent);
|
|
|
|
|
|
|
|
lock->unlock();
|
|
|
|
// write to metastore the last entry?
|
|
|
|
}
|
2020-04-05 12:18:51 +00:00
|
|
|
|
|
|
|
}
|