log based replicated

This commit is contained in:
Val 2020-05-11 15:55:17 +03:00
parent 319256ef4f
commit 0a860c0c2b
9 changed files with 142 additions and 124 deletions

View File

@ -6,11 +6,13 @@
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabasesCommon.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ParserCreateQuery.h>
@ -24,6 +26,7 @@
#include <Poco/DirectoryIterator.h>
#include <Poco/Event.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
@ -33,8 +36,10 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/Lock.h>
#include <ext/scope_guard.h>
#include <common/sleep.h>
namespace DB
{
@ -75,13 +80,11 @@ DatabaseReplicated::DatabaseReplicated(
// : DatabaseOrdinary(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseReplicated (" + name_ + ")", context_)
// TODO add constructor to Atomic and call it here with path and logger name specification
// TODO ask why const and & are ommited in Atomic
: DatabaseAtomic(name_, metadata_path_, context_)
: DatabaseOrdinary(name_, metadata_path_, context_)
, context(context_)
, zookeeper_path(zookeeper_path_)
, replica_name(replica_name_)
{
LOG_DEBUG(log, "METADATA PATH ARGUMENT " << metadata_path_);
LOG_DEBUG(log, "METADATA PATH ACTUAL " << getMetadataPath());
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
@ -103,94 +106,96 @@ DatabaseReplicated::DatabaseReplicated(
current_zookeeper->createAncestors(zookeeper_path);
current_zookeeper->createOrUpdate(zookeeper_path, String(), zkutil::CreateMode::Persistent);
// TODO if no last_entry then make it equal to 0 in zk;
// TODO launch a worker here
main_thread = ThreadFromGlobalPool(&DatabaseReplicated::runMainThread, this);
}
DatabaseReplicated::~DatabaseReplicated()
{
stop_flag = true;
main_thread.join();
}
void DatabaseReplicated::runMainThread() {
setThreadName("ReplctdWorker"); // ok whatever. 15 bytes // + database_name);
LOG_DEBUG(log, "Started " << database_name << " database worker thread\n Replica: " << replica_name);
while (!stop_flag) {
attachToThreadGroup();
sleepForSeconds(10);
current_zookeeper = getZooKeeper();
String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL);
size_t last_n_parsed = parse<size_t>(last_n);
while (current_log_entry_n < last_n_parsed) {
current_log_entry_n++;
executeLog(current_log_entry_n);
}
break; // debug purpose
}
}
void DatabaseReplicated::executeLog(size_t n) {
LOG_DEBUG(log, "EXECUTING LOG! DB: " << database_name << "\n Replica: " << replica_name << "LOG N" << n);
current_context = std::make_unique<Context>(context);
current_context->from_replicated_log = true;
current_context->setCurrentQueryId(""); // generate random query_id
current_zookeeper = getZooKeeper();
String query_to_execute = current_zookeeper->get(zookeeper_path + "/log." + std::to_string(n), {}, NULL);
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
executeQuery(istr, ostr, false, context, {});
}
// TODO we might not need it here at all
void DatabaseReplicated::attachToThreadGroup() {
if (thread_group)
{
/// Put all threads to one thread pool
CurrentThread::attachToIfDetached(thread_group);
}
else
{
CurrentThread::initializeQuery();
thread_group = CurrentThread::getGroup();
}
}
// taken from ddlworker
static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
{
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
zookeeper_holder->initFromInstance(zookeeper);
return std::make_unique<zkutil::Lock>(std::move(zookeeper_holder), lock_prefix, lock_name, lock_message);
}
void DatabaseReplicated::propose(const ASTPtr & query) {
// TODO if source is zk then omit propose. Throw?
// TODO remove that log message i think
LOG_DEBUG(log, "PROPOSING\n" << queryToString(query));
current_zookeeper = getZooKeeper();
auto lock = createSimpleZooKeeperLock(current_zookeeper, zookeeper_path, "lock", replica_name);
// TODO check that last_entry is the same as current_log_entry_n for the replica
current_log_entry_n++; // starting from 1
String log_entry = zookeeper_path + "/log." + std::to_string(current_log_entry_n);
current_zookeeper->createOrUpdate(log_entry, queryToString(query), zkutil::CreateMode::Persistent);
current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent);
lock->unlock();
// write to metastore the last entry?
}
// void DatabaseReplicated::createTable(
// const Context & context,
// const String & table_name,
// const StoragePtr & table,
// const ASTPtr & query)
// {
// LOG_DEBUG(log, "CREATE TABLE");
//
//
// DatabaseOnDisk::createTable(context, table_name, table, query);
//
// // String statement = getObjectDefinitionFromCreateQuery(query);
//
// // current_zookeeper = getZooKeeper();
// // current_zookeeper->createOrUpdate(replica_path + "/" + table_name + ".sql", statement, zkutil::CreateMode::Persistent);
// return;
// }
//
//
// void DatabaseReplicated::renameTable(
// const Context & context,
// const String & table_name,
// IDatabase & to_database,
// const String & to_table_name,
// bool exchange)
// {
// LOG_DEBUG(log, "RENAME TABLE");
// DatabaseAtomic::renameTable(context, table_name, to_database, to_table_name, exchange);
// // try
// // DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, exchange);
// // replicated stuff; what to put to a znode
// // String statement = getObjectDefinitionFromCreateQuery(query);
// // this one is fairly more complex
// // current_zookeeper = getZooKeeper();
//
// // no need for now to have stat
// // Coordination::Stat metadata_stat;
// // auto statement = current_zookeeper->get(replica_path + "/" + table_name, &metadata_stat);
// // current_zookeeper->createOrUpdate(replica_path + "/" + to_table_name, statement, zkutil::CreateMode::Persistent);
// // current_zookeeper->remove(replica_path + "/" + table_name);
// // TODO add rename statement to the log
// return;
// }
//
// void DatabaseReplicated::dropTable(
// const Context & context,
// const String & table_name,
// bool no_delay)
// {
// LOG_DEBUG(log, "DROP TABLE");
// DatabaseAtomic::dropTable(context, table_name, no_delay);
// // try
// // DatabaseOnDisk::dropTable(context, table_name, no_delay);
//
// // let's do dumb remove from zk at the first iteration
// // current_zookeeper = getZooKeeper();
// // current_zookeeper->remove(replica_path + "/" + table_name);
// return;
// }
//
// void DatabaseReplicated::drop(const Context & context)
// {
// LOG_DEBUG(log, "DROP");
// DatabaseAtomic::drop(context);
// // current_zookeeper = getZooKeeper();
// // current_zookeeper->remove(replica_path);
//
// // DatabaseOnDisk::drop(context); // no throw
// return;
// }
//
// void DatabaseReplicated::loadStoredObjects(
// Context & context,
// bool has_force_restore_data_flag)
// {
// DatabaseOrdinary::loadStoredObjects(context, has_force_restore_data_flag);
// // launch a worker maybe. i don't know
// // DatabaseAtomic::loadStoredObjects(context, has_force_restore_data_flag);
//
// return;
// }
}

View File

@ -1,9 +1,12 @@
#pragma once
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <atomic>
#include <thread>
namespace DB
{
/** Replicated database engine.
@ -11,49 +14,35 @@ namespace DB
* that contain declaration of table represented by SQL ATTACH TABLE query
* and operation log in zookeeper
*/
class DatabaseReplicated : public DatabaseAtomic
class DatabaseReplicated : public DatabaseOrdinary
{
public:
DatabaseReplicated(const String & name_, const String & metadata_path_, const String & zookeeper_path_, const String & replica_name_, Context & context);
// void drop(const Context & context) override;
~DatabaseReplicated();
String getEngineName() const override { return "Replicated"; }
void propose(const ASTPtr & query) override;
// void createTable(
// const Context & context,
// const String & table_name,
// const StoragePtr & table,
// const ASTPtr & query) override;
//
// void dropTable(
// const Context & context,
// const String & table_name,
// bool no_delay) override;
//
// void renameTable(
// const Context & context,
// const String & table_name,
// IDatabase & to_database,
// const String & to_table_name,
// bool exchange) override;
//
// void alterTable(
// const Context & context,
// const StorageID & table_id,
// const StorageInMemoryMetadata & metadata) override;
// void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
//
// StoragePtr detachTable(const String & name) override;
// void loadStoredObjects(
// Context & context,
// bool has_force_restore_data_flag) override;
private:
void runMainThread();
void runCleanupThread();
void attachToThreadGroup();
void executeLog(size_t n);
Context & context; // is it overkiill?
std::unique_ptr<Context> current_context; // to run executeQuery
size_t current_log_entry_n = 0;
std::atomic<bool> stop_flag{false};
ThreadFromGlobalPool main_thread;
ThreadGroupStatusPtr thread_group;
String zookeeper_path;
String replica_name;
String replica_path;

View File

@ -38,6 +38,7 @@ public:
NO_QUERY = 0, /// Uninitialized object.
INITIAL_QUERY = 1,
SECONDARY_QUERY = 2, /// Query that was initiated by another query for distributed or ON CLUSTER query execution.
REPLICATED_LOG_QUERY = 3, /// TODO add comment
};

View File

@ -214,6 +214,9 @@ private:
Context();
public:
///testing
bool from_replicated_log = false;
/// Create initial Context with ContextShared and etc.
static Context createGlobal(ContextShared * shared);
static SharedContextHolder createShared();

View File

@ -585,7 +585,8 @@ bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, Exec
try
{
current_context = std::make_unique<Context>(context);
current_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
//current_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
current_context->from_replicated_log = true;
current_context->setCurrentQueryId(""); // generate random query_id
executeQuery(istr, ostr, false, *current_context, {});
}

View File

@ -15,6 +15,8 @@
#include <Common/typeid_cast.h>
#include <boost/range/algorithm_ext/push_back.hpp>
#include <algorithm>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseFactory.h>
namespace DB
@ -37,6 +39,7 @@ BlockIO InterpreterAlterQuery::execute()
{
const auto & alter = query_ptr->as<ASTAlterQuery &>();
if (!alter.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context, getRequiredAccess());
@ -46,6 +49,12 @@ BlockIO InterpreterAlterQuery::execute()
auto alter_lock = table->lockForAlter(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
// TODO it's dirty. need to add database to parsing stage
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (database->getEngineName() == "Replicated" && !context.from_replicated_log) {
database->propose(query_ptr);
}
/// Add default database to table identifiers that we can encounter in e.g. default expressions,
/// mutation expression, etc.
AddDefaultDatabaseVisitor visitor(table_id.getDatabaseName());

View File

@ -622,7 +622,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
if (need_add_to_database)
{
database = DatabaseCatalog::instance().getDatabase(create.database);
if (database->getEngineName() == "Atomic" || database->getEngineName() == "Replicated")
if (database->getEngineName() == "Atomic") // || database->getEngineName() == "Replicated")
{
/// TODO implement ATTACH FROM 'path/to/data': generate UUID and move table data to store/
if (create.attach && create.uuid == UUIDHelpers::Nil)
@ -697,7 +697,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
if (database->getEngineName() == "Replicated") {
if (database->getEngineName() == "Replicated" && !context.from_replicated_log) {
// propose
// try to
database->propose(query_ptr);

View File

@ -97,6 +97,9 @@ BlockIO InterpreterDropQuery::executeToTable(
if (database->getEngineName() != "Atomic")
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Drop table from memory, don't touch data and metadata
if (database->getEngineName() == "Replicated" && !context.from_replicated_log) {
database->propose(query_ptr);
}
database->detachTable(table_id.table_name);
}
else if (query.kind == ASTDropQuery::Kind::Truncate)
@ -120,6 +123,9 @@ BlockIO InterpreterDropQuery::executeToTable(
if (database->getEngineName() != "Atomic")
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
if (database->getEngineName() == "Replicated" && !context.from_replicated_log) {
database->propose(query_ptr);
}
database->dropTable(context, table_id.table_name, query.no_delay);
}
}

View File

@ -80,7 +80,11 @@ BlockIO InterpreterRenameQuery::execute()
if (!rename.exchange)
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context);
database_catalog.getDatabase(elem.from_database_name)->renameTable(
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
if (database->getEngineName() == "Replicated" && !context.from_replicated_log) {
database->propose(query_ptr);
}
database->renameTable(
context,
elem.from_table_name,
*database_catalog.getDatabase(elem.to_database_name),