Merge pull request #1 from yandex/repair-subtree2

Repair subtree2
This commit is contained in:
alexey-milovidov 2016-06-08 21:22:02 +03:00
commit 332df69f39
10 changed files with 152 additions and 66 deletions

View File

@ -163,7 +163,7 @@ public:
protected:
int run();
void waitForTerminationRequest();
virtual void waitForTerminationRequest();
#if !defined(_WIN32_WCE)
void defineOptions(OptionSet& options);
#endif

View File

@ -129,20 +129,10 @@ public:
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision);
/// Для сообщений в логе и в эксепшенах.
const String & getDescription() const
{
return description;
}
const String & getHost() const
{
return host;
}
UInt16 getPort() const
{
return port;
}
const String & getDescription() const;
const String & getHost() const;
UInt16 getPort() const;
const String & getDefaultDatabase() const;
/// Если последний флаг true, то затем необходимо вызвать sendExternalTablesData
void sendQuery(const String & query, const String & query_id_ = "", UInt64 stage = QueryProcessingStage::Complete,

View File

@ -88,7 +88,7 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str
std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace);
void rethrowFirstException(Exceptions & exceptions);
void rethrowFirstException(const Exceptions & exceptions);
std::unique_ptr<Poco::Exception> convertCurrentException();

View File

@ -142,12 +142,30 @@ void Connection::receiveHello()
}
}
void Connection::setDefaultDatabase(const String & database)
{
default_database = database;
}
const String & Connection::getDefaultDatabase() const
{
return default_database;
}
const String & Connection::getDescription() const
{
return description;
}
const String & Connection::getHost() const
{
return host;
}
UInt16 Connection::getPort() const
{
return port;
}
void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision)
{

View File

@ -139,7 +139,7 @@ std::unique_ptr<Poco::Exception> convertCurrentException()
}
void rethrowFirstException(Exceptions & exceptions)
void rethrowFirstException(const Exceptions & exceptions)
{
for (size_t i = 0, size = exceptions.size(); i < size; ++i)
if (exceptions[i])

View File

@ -44,6 +44,7 @@ else
# нужно для stash или неполноценной копии репозитория
revision="77777"
fi
revision=$(echo $revision | sed 's/\([0-9]*\)[^0-9]*/\1/')
echo "
#ifndef REVISION

View File

@ -52,6 +52,8 @@ namespace Poco { class TaskManager; }
class BaseDaemon : public Poco::Util::ServerApplication
{
friend class SignalListener;
public:
BaseDaemon();
~BaseDaemon();
@ -132,6 +134,14 @@ protected:
/// Используется при exitOnTaskError()
void handleNotification(Poco::TaskFailedNotification *);
/// thread safe
virtual void handleSignal(int signal_id);
/// реализация обработки сигналов завершения через pipe не требует блокировки сигнала с помощью sigprocmask во всех потоках
void waitForTerminationRequest() override;
/// thread safe
virtual void onInterruptSignals(int signal_id);
std::unique_ptr<Poco::TaskManager> task_manager;
/// Создание и автоматическое удаление pid файла.
@ -156,8 +166,7 @@ protected:
PID pid;
/// Получен ли сигнал на завершение? Этот флаг устанавливается в BaseDaemonApplication.
bool is_cancelled = false;
std::atomic_bool is_cancelled{false};
/// Флаг устанавливается по сообщению из Task (при аварийном завершении).
bool task_failed = false;
@ -179,4 +188,8 @@ protected:
std::unique_ptr<GraphiteWriter> graphite_writer;
boost::optional<size_t> layer;
std::mutex signal_handler_mutex;
std::condition_variable signal_event;
size_t terminate_signals_counter = 0;
};

View File

@ -131,9 +131,9 @@ static void call_default_signal_handler(int sig)
using ThreadNumber = decltype(Poco::ThreadNumber::get());
static const size_t buf_size = sizeof(int) + sizeof(siginfo_t) + sizeof(ucontext_t) + sizeof(ThreadNumber);
using signal_function = void(int, siginfo_t*, void*);
/** Обработчик сигналов HUP / USR1 */
static void close_logs_signal_handler(int sig, siginfo_t * info, void * context)
static void writeSignalIDtoSignalPipe(int sig)
{
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf);
@ -141,10 +141,21 @@ static void close_logs_signal_handler(int sig, siginfo_t * info, void * context)
out.next();
}
/** Обработчик сигналов HUP / USR1 */
static void closeLogsSignalHandler(int sig, siginfo_t * info, void * context)
{
writeSignalIDtoSignalPipe(sig);
}
static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * context)
{
writeSignalIDtoSignalPipe(sig);
}
/** Обработчик некоторых сигналов. Выводит информацию в лог (если получится).
*/
static void fault_signal_handler(int sig, siginfo_t * info, void * context)
static void faultSignalHandler(int sig, siginfo_t * info, void * context)
{
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.write_fd, buf_size, buf);
@ -174,7 +185,9 @@ static bool already_printed_stack_trace = false;
class SignalListener : public Poco::Runnable
{
public:
SignalListener() : log(&Logger::get("BaseDaemon"))
SignalListener(BaseDaemon & daemon_)
: log(&Logger::get("BaseDaemon"))
, daemon(daemon_)
{
}
@ -204,6 +217,12 @@ public:
onTerminate(message, thread_num);
}
else if (sig == SIGINT ||
sig == SIGQUIT ||
sig == SIGTERM)
{
daemon.handleSignal(sig);
}
else
{
siginfo_t info;
@ -221,8 +240,9 @@ public:
private:
Logger * log;
BaseDaemon & daemon;
private:
void onTerminate(const std::string & message, ThreadNumber thread_num) const
{
LOG_ERROR(log, "(from thread " << thread_num << ") " << message);
@ -739,42 +759,31 @@ void BaseDaemon::initialize(Application& self)
std::set_terminate(terminate_handler);
/// Ставим обработчики сигналов
auto add_signal_handler =
[](const std::vector<int> & signals, signal_function handler)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = fault_signal_handler;
sa.sa_sigaction = handler;
sa.sa_flags = SA_SIGINFO;
{
int signals[] = {SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, 0};
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Cannot set signal handler.");
for (size_t i = 0; signals[i]; ++i)
if (sigaddset(&sa.sa_mask, signals[i]))
for (auto signal : signals)
if (sigaddset(&sa.sa_mask, signal))
throw Poco::Exception("Cannot set signal handler.");
for (size_t i = 0; signals[i]; ++i)
if (sigaction(signals[i], &sa, 0))
for (auto signal : signals)
if (sigaction(signal, &sa, 0))
throw Poco::Exception("Cannot set signal handler.");
}
};
sa.sa_sigaction = close_logs_signal_handler;
{
int signals[] = {SIGHUP, SIGUSR1, 0};
if (sigemptyset(&sa.sa_mask))
throw Poco::Exception("Cannot set signal handler.");
for (size_t i = 0; signals[i]; ++i)
if (sigaddset(&sa.sa_mask, signals[i]))
throw Poco::Exception("Cannot set signal handler.");
for (size_t i = 0; signals[i]; ++i)
if (sigaction(signals[i], &sa, 0))
throw Poco::Exception("Cannot set signal handler.");
}
add_signal_handler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE}, faultSignalHandler);
add_signal_handler({SIGHUP, SIGUSR1}, closeLogsSignalHandler);
add_signal_handler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler);
/// Ставим ErrorHandler для потоков
static KillingErrorHandler killing_error_handler;
@ -783,7 +792,7 @@ void BaseDaemon::initialize(Application& self)
/// Выведем ревизию демона
logRevision();
signal_listener.reset(new SignalListener);
signal_listener.reset(new SignalListener(*this));
signal_listener_thread.start(*signal_listener);
graphite_writer.reset(new GraphiteWriter("graphite"));
@ -890,3 +899,35 @@ void BaseDaemon::PID::clear()
file.clear();
}
}
void BaseDaemon::handleSignal(int signal_id)
{
if (signal_id == SIGINT ||
signal_id == SIGQUIT ||
signal_id == SIGTERM)
{
std::unique_lock<std::mutex> lock(signal_handler_mutex);
{
++terminate_signals_counter;
signal_event.notify_all();
}
onInterruptSignals(signal_id);
}
else
throw DB::Exception(std::string("Unsupported signal: ") + strsignal(signal_id));
}
void BaseDaemon::onInterruptSignals(int signal_id)
{
is_cancelled = true;
LOG_INFO(&logger(), "Received termination signal(" << strsignal(signal_id) << ")");
}
void BaseDaemon::waitForTerminationRequest()
{
std::unique_lock<std::mutex> lock(signal_handler_mutex);
signal_event.wait(lock, [this](){ return terminate_signals_counter > 0; });
}

View File

@ -16,6 +16,6 @@ add_library(zkutil
include/zkutil/Types.h
include/zkutil/ZooKeeperHolder.h)
target_link_libraries(zkutil libzookeeper_mt.a pthread)
target_link_libraries(zkutil libzookeeper_mt.a pthread PocoFoundation)
add_subdirectory (src)

View File

@ -12,6 +12,8 @@ struct CallbackState
{
std::string path;
std::list<CallbackState>::const_iterator it;
std::list<std::list<CallbackState>::const_iterator> children;
int64_t dataLength = 0;
};
using CallbackStates = std::list<CallbackState>;
@ -19,10 +21,11 @@ CallbackStates states;
zkutil::ZooKeeper * zookeeper;
int running_count = 0;
Poco::Event completed;
void process(const CallbackState & state);
void process(CallbackState & state);
void callback(
int rc,
@ -30,13 +33,16 @@ void callback(
const Stat * stat,
const void * data)
{
const CallbackState * state = reinterpret_cast<const CallbackState *>(data);
CallbackState * state = reinterpret_cast<CallbackState *>(const_cast<void *>(data));
if (rc != ZOK && rc != ZNONODE)
{
std::cerr << zerror(rc) << ", path: " << state->path << "\n";
}
if (stat != nullptr)
state->dataLength = stat->dataLength;
if (rc == ZOK && strings)
{
for (int32_t i = 0; i < strings->count; ++i)
@ -44,23 +50,38 @@ void callback(
states.emplace_back();
states.back().path = state->path + (state->path == "/" ? "" : "/") + strings->data[i];
states.back().it = --states.end();
state->children.push_back(states.back().it);
std::cout << states.back().path << '\n';
process(states.back());
}
}
states.erase(state->it);
if (states.empty())
--running_count;
if (running_count == 0)
completed.set();
}
void process(const CallbackState & state)
void process(CallbackState & state)
{
++running_count;
zoo_awget_children2(zookeeper->getHandle(), state.path.data(), nullptr, nullptr, callback, &state);
}
typedef std::pair<int64_t, int64_t> NodesBytes;
NodesBytes printTree(const CallbackState & state)
{
int64_t nodes = 1;
int64_t bytes = state.dataLength;
for (auto child : state.children)
{
NodesBytes nodesBytes = printTree(*child);
nodes += nodesBytes.first;
bytes += nodesBytes.second;
}
std::cout << state.path << '\t' << nodes << '\t' << bytes <<'\n';
return NodesBytes(nodes, bytes);
}
int main(int argc, char ** argv)
try
@ -95,6 +116,8 @@ try
process(states.back());
completed.wait();
printTree(*states.begin());
}
catch (...)
{