ddl replication works

This commit is contained in:
Val 2020-05-12 16:35:05 +03:00
parent 5eea58039c
commit d61259cd7b
2 changed files with 16 additions and 8 deletions

View File

@ -32,6 +32,7 @@
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h> #include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h> #include <Common/ZooKeeper/Types.h>
@ -81,7 +82,6 @@ DatabaseReplicated::DatabaseReplicated(
// TODO add constructor to Atomic and call it here with path and logger name specification // TODO add constructor to Atomic and call it here with path and logger name specification
// TODO ask why const and & are ommited in Atomic // TODO ask why const and & are ommited in Atomic
: DatabaseOrdinary(name_, metadata_path_, context_) : DatabaseOrdinary(name_, metadata_path_, context_)
, context(context_)
, zookeeper_path(zookeeper_path_) , zookeeper_path(zookeeper_path_)
, replica_name(replica_name_) , replica_name(replica_name_)
{ {
@ -142,17 +142,26 @@ void DatabaseReplicated::runMainThread() {
void DatabaseReplicated::executeLog(size_t n) { void DatabaseReplicated::executeLog(size_t n) {
LOG_DEBUG(log, "EXECUTING LOG! DB: " << database_name << "\n Replica: " << replica_name << "LOG N" << n);
current_context = std::make_unique<Context>(context);
current_context->from_replicated_log = true;
current_context->setCurrentQueryId(""); // generate random query_id
current_zookeeper = getZooKeeper(); current_zookeeper = getZooKeeper();
String query_to_execute = current_zookeeper->get(zookeeper_path + "/log." + std::to_string(n), {}, NULL); String query_to_execute = current_zookeeper->get(zookeeper_path + "/log." + std::to_string(n), {}, NULL);
ReadBufferFromString istr(query_to_execute); ReadBufferFromString istr(query_to_execute);
String dummy_string; String dummy_string;
WriteBufferFromString ostr(dummy_string); WriteBufferFromString ostr(dummy_string);
executeQuery(istr, ostr, false, context, {});
try
{
current_context = std::make_unique<Context>(global_context);
current_context->from_replicated_log = true;
current_context->setCurrentQueryId(""); // generate random query_id
executeQuery(istr, ostr, false, *current_context, {});
}
catch (...)
{
tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully");
}
LOG_DEBUG(log, "Executed query: " << query_to_execute);
} }
// TODO we might not need it here at all // TODO we might not need it here at all

View File

@ -34,7 +34,6 @@ private:
void executeLog(size_t n); void executeLog(size_t n);
Context & context; // is it overkiill?
std::unique_ptr<Context> current_context; // to run executeQuery std::unique_ptr<Context> current_context; // to run executeQuery
std::atomic<size_t> current_log_entry_n = 0; std::atomic<size_t> current_log_entry_n = 0;