diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index a1eb910dedf..1bc954bfb76 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -6,11 +6,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -24,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -33,8 +36,10 @@ #include #include #include +#include #include +#include namespace DB { @@ -75,13 +80,11 @@ DatabaseReplicated::DatabaseReplicated( // : DatabaseOrdinary(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseReplicated (" + name_ + ")", context_) // TODO add constructor to Atomic and call it here with path and logger name specification // TODO ask why const and & are ommited in Atomic - : DatabaseAtomic(name_, metadata_path_, context_) + : DatabaseOrdinary(name_, metadata_path_, context_) + , context(context_) , zookeeper_path(zookeeper_path_) , replica_name(replica_name_) { - LOG_DEBUG(log, "METADATA PATH ARGUMENT " << metadata_path_); - LOG_DEBUG(log, "METADATA PATH ACTUAL " << getMetadataPath()); - if (!zookeeper_path.empty() && zookeeper_path.back() == '/') zookeeper_path.resize(zookeeper_path.size() - 1); // If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it. @@ -103,94 +106,96 @@ DatabaseReplicated::DatabaseReplicated( current_zookeeper->createAncestors(zookeeper_path); current_zookeeper->createOrUpdate(zookeeper_path, String(), zkutil::CreateMode::Persistent); + // TODO if no last_entry then make it equal to 0 in zk; + // TODO launch a worker here + + main_thread = ThreadFromGlobalPool(&DatabaseReplicated::runMainThread, this); +} + +DatabaseReplicated::~DatabaseReplicated() +{ + stop_flag = true; + main_thread.join(); +} + +void DatabaseReplicated::runMainThread() { + setThreadName("ReplctdWorker"); // ok whatever. 15 bytes // + database_name); + LOG_DEBUG(log, "Started " << database_name << " database worker thread\n Replica: " << replica_name); + + while (!stop_flag) { + attachToThreadGroup(); + + sleepForSeconds(10); + current_zookeeper = getZooKeeper(); + String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL); + size_t last_n_parsed = parse(last_n); + while (current_log_entry_n < last_n_parsed) { + current_log_entry_n++; + executeLog(current_log_entry_n); + } + break; // debug purpose + } +} + +void DatabaseReplicated::executeLog(size_t n) { + + LOG_DEBUG(log, "EXECUTING LOG! DB: " << database_name << "\n Replica: " << replica_name << "LOG N" << n); + current_context = std::make_unique(context); + current_context->from_replicated_log = true; + current_context->setCurrentQueryId(""); // generate random query_id + current_zookeeper = getZooKeeper(); + + String query_to_execute = current_zookeeper->get(zookeeper_path + "/log." + std::to_string(n), {}, NULL); + ReadBufferFromString istr(query_to_execute); + String dummy_string; + WriteBufferFromString ostr(dummy_string); + executeQuery(istr, ostr, false, context, {}); +} + +// TODO we might not need it here at all +void DatabaseReplicated::attachToThreadGroup() { + if (thread_group) + { + /// Put all threads to one thread pool + CurrentThread::attachToIfDetached(thread_group); + } + else + { + CurrentThread::initializeQuery(); + thread_group = CurrentThread::getGroup(); + } +} + +// taken from ddlworker +static std::unique_ptr createSimpleZooKeeperLock( + const std::shared_ptr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message) +{ + auto zookeeper_holder = std::make_shared(); + zookeeper_holder->initFromInstance(zookeeper); + return std::make_unique(std::move(zookeeper_holder), lock_prefix, lock_name, lock_message); } void DatabaseReplicated::propose(const ASTPtr & query) { + // TODO if source is zk then omit propose. Throw? + + // TODO remove that log message i think LOG_DEBUG(log, "PROPOSING\n" << queryToString(query)); + + current_zookeeper = getZooKeeper(); + auto lock = createSimpleZooKeeperLock(current_zookeeper, zookeeper_path, "lock", replica_name); + + // TODO check that last_entry is the same as current_log_entry_n for the replica + + current_log_entry_n++; // starting from 1 + String log_entry = zookeeper_path + "/log." + std::to_string(current_log_entry_n); + current_zookeeper->createOrUpdate(log_entry, queryToString(query), zkutil::CreateMode::Persistent); + + current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent); + + lock->unlock(); + // write to metastore the last entry? } -// void DatabaseReplicated::createTable( -// const Context & context, -// const String & table_name, -// const StoragePtr & table, -// const ASTPtr & query) -// { -// LOG_DEBUG(log, "CREATE TABLE"); -// -// -// DatabaseOnDisk::createTable(context, table_name, table, query); -// -// // String statement = getObjectDefinitionFromCreateQuery(query); -// -// // current_zookeeper = getZooKeeper(); -// // current_zookeeper->createOrUpdate(replica_path + "/" + table_name + ".sql", statement, zkutil::CreateMode::Persistent); -// return; -// } -// -// -// void DatabaseReplicated::renameTable( -// const Context & context, -// const String & table_name, -// IDatabase & to_database, -// const String & to_table_name, -// bool exchange) -// { -// LOG_DEBUG(log, "RENAME TABLE"); -// DatabaseAtomic::renameTable(context, table_name, to_database, to_table_name, exchange); -// // try -// // DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, exchange); -// // replicated stuff; what to put to a znode -// // String statement = getObjectDefinitionFromCreateQuery(query); -// // this one is fairly more complex -// // current_zookeeper = getZooKeeper(); -// -// // no need for now to have stat -// // Coordination::Stat metadata_stat; -// // auto statement = current_zookeeper->get(replica_path + "/" + table_name, &metadata_stat); -// // current_zookeeper->createOrUpdate(replica_path + "/" + to_table_name, statement, zkutil::CreateMode::Persistent); -// // current_zookeeper->remove(replica_path + "/" + table_name); -// // TODO add rename statement to the log -// return; -// } -// -// void DatabaseReplicated::dropTable( -// const Context & context, -// const String & table_name, -// bool no_delay) -// { -// LOG_DEBUG(log, "DROP TABLE"); -// DatabaseAtomic::dropTable(context, table_name, no_delay); -// // try -// // DatabaseOnDisk::dropTable(context, table_name, no_delay); -// -// // let's do dumb remove from zk at the first iteration -// // current_zookeeper = getZooKeeper(); -// // current_zookeeper->remove(replica_path + "/" + table_name); -// return; -// } -// -// void DatabaseReplicated::drop(const Context & context) -// { -// LOG_DEBUG(log, "DROP"); -// DatabaseAtomic::drop(context); -// // current_zookeeper = getZooKeeper(); -// // current_zookeeper->remove(replica_path); -// -// // DatabaseOnDisk::drop(context); // no throw -// return; -// } -// -// void DatabaseReplicated::loadStoredObjects( -// Context & context, -// bool has_force_restore_data_flag) -// { -// DatabaseOrdinary::loadStoredObjects(context, has_force_restore_data_flag); -// // launch a worker maybe. i don't know -// // DatabaseAtomic::loadStoredObjects(context, has_force_restore_data_flag); -// -// return; -// } - } diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index df6f86c1491..d61f0a00ef8 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -1,9 +1,12 @@ #pragma once -#include +#include #include #include +#include +#include + namespace DB { /** Replicated database engine. @@ -11,49 +14,35 @@ namespace DB * that contain declaration of table represented by SQL ATTACH TABLE query * and operation log in zookeeper */ -class DatabaseReplicated : public DatabaseAtomic +class DatabaseReplicated : public DatabaseOrdinary { public: DatabaseReplicated(const String & name_, const String & metadata_path_, const String & zookeeper_path_, const String & replica_name_, Context & context); -// void drop(const Context & context) override; + ~DatabaseReplicated(); String getEngineName() const override { return "Replicated"; } void propose(const ASTPtr & query) override; -// void createTable( -// const Context & context, -// const String & table_name, -// const StoragePtr & table, -// const ASTPtr & query) override; -// -// void dropTable( -// const Context & context, -// const String & table_name, -// bool no_delay) override; -// -// void renameTable( -// const Context & context, -// const String & table_name, -// IDatabase & to_database, -// const String & to_table_name, -// bool exchange) override; -// -// void alterTable( -// const Context & context, -// const StorageID & table_id, -// const StorageInMemoryMetadata & metadata) override; - -// void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override; -// -// StoragePtr detachTable(const String & name) override; - -// void loadStoredObjects( -// Context & context, -// bool has_force_restore_data_flag) override; - private: + + void runMainThread(); + void runCleanupThread(); + + void attachToThreadGroup(); + + void executeLog(size_t n); + + Context & context; // is it overkiill? + std::unique_ptr current_context; // to run executeQuery + + size_t current_log_entry_n = 0; + std::atomic stop_flag{false}; + + ThreadFromGlobalPool main_thread; + ThreadGroupStatusPtr thread_group; + String zookeeper_path; String replica_name; String replica_path; diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 704fba3b3ef..2dff30e40a2 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -38,6 +38,7 @@ public: NO_QUERY = 0, /// Uninitialized object. INITIAL_QUERY = 1, SECONDARY_QUERY = 2, /// Query that was initiated by another query for distributed or ON CLUSTER query execution. + REPLICATED_LOG_QUERY = 3, /// TODO add comment }; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 5a4e959229f..66ea6f6914c 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -214,6 +214,9 @@ private: Context(); public: + ///testing + bool from_replicated_log = false; + /// Create initial Context with ContextShared and etc. static Context createGlobal(ContextShared * shared); static SharedContextHolder createShared(); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 28436f192b0..65f984924a3 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -585,7 +585,8 @@ bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, Exec try { current_context = std::make_unique(context); - current_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + //current_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + current_context->from_replicated_log = true; current_context->setCurrentQueryId(""); // generate random query_id executeQuery(istr, ostr, false, *current_context, {}); } diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 61277b8160c..ad79bd68fed 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include namespace DB @@ -37,6 +39,7 @@ BlockIO InterpreterAlterQuery::execute() { const auto & alter = query_ptr->as(); + if (!alter.cluster.empty()) return executeDDLQueryOnCluster(query_ptr, context, getRequiredAccess()); @@ -46,6 +49,12 @@ BlockIO InterpreterAlterQuery::execute() auto alter_lock = table->lockForAlter(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); auto metadata_snapshot = table->getInMemoryMetadataPtr(); + // TODO it's dirty. need to add database to parsing stage + DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name); + if (database->getEngineName() == "Replicated" && !context.from_replicated_log) { + database->propose(query_ptr); + } + /// Add default database to table identifiers that we can encounter in e.g. default expressions, /// mutation expression, etc. AddDefaultDatabaseVisitor visitor(table_id.getDatabaseName()); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 99c021a72fa..5698c370fa1 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -622,7 +622,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, if (need_add_to_database) { database = DatabaseCatalog::instance().getDatabase(create.database); - if (database->getEngineName() == "Atomic" || database->getEngineName() == "Replicated") + if (database->getEngineName() == "Atomic") // || database->getEngineName() == "Replicated") { /// TODO implement ATTACH FROM 'path/to/data': generate UUID and move table data to store/ if (create.attach && create.uuid == UUIDHelpers::Nil) @@ -697,7 +697,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, } - if (database->getEngineName() == "Replicated") { + if (database->getEngineName() == "Replicated" && !context.from_replicated_log) { // propose // try to database->propose(query_ptr); diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index e6853a8af4c..bae1b796016 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -97,6 +97,9 @@ BlockIO InterpreterDropQuery::executeToTable( if (database->getEngineName() != "Atomic") table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); /// Drop table from memory, don't touch data and metadata + if (database->getEngineName() == "Replicated" && !context.from_replicated_log) { + database->propose(query_ptr); + } database->detachTable(table_id.table_name); } else if (query.kind == ASTDropQuery::Kind::Truncate) @@ -120,6 +123,9 @@ BlockIO InterpreterDropQuery::executeToTable( if (database->getEngineName() != "Atomic") table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); + if (database->getEngineName() == "Replicated" && !context.from_replicated_log) { + database->propose(query_ptr); + } database->dropTable(context, table_id.table_name, query.no_delay); } } diff --git a/src/Interpreters/InterpreterRenameQuery.cpp b/src/Interpreters/InterpreterRenameQuery.cpp index de2b6bb0c1c..d93b14a6bc2 100644 --- a/src/Interpreters/InterpreterRenameQuery.cpp +++ b/src/Interpreters/InterpreterRenameQuery.cpp @@ -80,7 +80,11 @@ BlockIO InterpreterRenameQuery::execute() if (!rename.exchange) database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context); - database_catalog.getDatabase(elem.from_database_name)->renameTable( + DatabasePtr database = database_catalog.getDatabase(elem.from_database_name); + if (database->getEngineName() == "Replicated" && !context.from_replicated_log) { + database->propose(query_ptr); + } + database->renameTable( context, elem.from_table_name, *database_catalog.getDatabase(elem.to_database_name),