Parsing Keeper commands via ClickHouse Parser

This commit is contained in:
pufit 2023-04-26 00:54:28 -04:00
parent 892e436046
commit 8bef8fc1de
11 changed files with 514 additions and 108 deletions

View File

@ -1,4 +1,4 @@
set (CLICKHOUSE_KEEPER_CLIENT_SOURCES KeeperClient.cpp)
set (CLICKHOUSE_KEEPER_CLIENT_SOURCES KeeperClient.cpp Parser.cpp Commands.cpp)
set (CLICKHOUSE_KEEPER_CLIENT_LINK
PRIVATE

View File

@ -0,0 +1,184 @@
#include "Commands.h"
#include "KeeperClient.h"
namespace DB
{
bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return true;
node->args.push_back(std::move(arg));
return true;
}
void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
String path;
if (!query->args.empty())
path = client->getAbsolutePath(query->args[0].safeGet<String>());
else
path = client->cwd;
const auto children = client->zookeeper->getChildren(path);
for (const auto & child : children)
std::cout << child << " ";
std::cout << "\n";
}
bool CDCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return true;
node->args.push_back(std::move(arg));
return true;
}
void CDCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
if (!query->args.empty())
return;
auto new_path = client->getAbsolutePath(query->args[0].safeGet<String>());
if (!client->zookeeper->exists(new_path))
std::cerr << "Path " << new_path << " does not exists\n";
else
client->cwd = new_path;
}
bool SetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
ASTPtr version;
if (ParserNumber{}.parse(pos, version, expected))
node->args.push_back(version->as<ASTLiteral &>().value);
return true;
}
void SetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
if (query->args.size() == 2)
client->zookeeper->set(client->getAbsolutePath(query->args[0].safeGet<String>()), query->args[1].safeGet<String>());
else
client->zookeeper->set(
client->getAbsolutePath(query->args[0].safeGet<String>()),
query->args[1].safeGet<String>(),
static_cast<Int32>(query->args[2].safeGet<Int64>()));
}
bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
return true;
}
void CreateCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
client->zookeeper->create(
client->getAbsolutePath(query->args[0].safeGet<String>()),
query->args[1].safeGet<String>(),
zkutil::CreateMode::Persistent);
}
bool GetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
return true;
}
void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
return true;
}
void RMCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
client->zookeeper->remove(client->getAbsolutePath(query->args[0].safeGet<String>()));
}
bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
return true;
}
void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
String path = client->getAbsolutePath(query->args[0].safeGet<String>());
client->askConfirmation("You are going to recursively delete path " + path,
[client, path]{ client->zookeeper->removeRecursive(path); });
}
bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;
}
void HelpCommand::execute(const ASTKeeperQuery * /* query */, KeeperClient * /* client */) const
{
for (const auto & pair : KeeperClient::commands)
std::cout << pair.second->getHelpMessage() << '\n';
}
bool FourLetterWordCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
expected.add(pos, "four-letter-word command");
if (pos->type != TokenType::BareWord)
return false;
String cmd(pos->begin, pos->end);
if (cmd.size() != 4)
return false;
++pos;
node->args.push_back(std::move(cmd));
return true;
}
void FourLetterWordCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
std::cout << client->executeFourLetterCommand(query->args[0].safeGet<String>()) << "\n";
}
}

View File

@ -0,0 +1,131 @@
#pragma once
#include "Parser.h"
namespace DB
{
class KeeperClient;
class IKeeperClientCommand
{
public:
static const String name;
virtual bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const = 0;
virtual void execute(const ASTKeeperQuery * query, KeeperClient * client) const = 0;
virtual String getHelpMessage() const = 0;
virtual String getName() const = 0;
virtual ~IKeeperClientCommand() = default;
};
using Command = std::shared_ptr<IKeeperClientCommand>;
class LSCommand : public IKeeperClientCommand
{
String getName() const override { return "ls"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "ls [path] -- Lists the nodes for the given path (default: cwd)"; }
};
class CDCommand : public IKeeperClientCommand
{
String getName() const override { return "cd"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "cd [path] -- Change the working path (default `.`)"; }
};
class SetCommand : public IKeeperClientCommand
{
String getName() const override { return "set"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "set <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
}
};
class CreateCommand : public IKeeperClientCommand
{
String getName() const override { return "create"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "create <path> <value> -- Creates new node"; }
};
class GetCommand : public IKeeperClientCommand
{
String getName() const override { return "get"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "get <path> -- Returns the node's value"; }
};
class RMCommand : public IKeeperClientCommand
{
String getName() const override { return "rm"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "remove <path> -- Remove the node"; }
};
class RMRCommand : public IKeeperClientCommand
{
String getName() const override { return "rmr"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "rmr <path> -- Recursively deletes path. Confirmation required"; }
};
class HelpCommand : public IKeeperClientCommand
{
String getName() const override { return "help"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "help -- Prints this message"; }
};
class FourLetterWordCommand : public IKeeperClientCommand
{
String getName() const override { return "flwc"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "flwc <command> -- Executes four-letter-word command"; }
};
}

View File

@ -1,9 +1,11 @@
#include "KeeperClient.h"
#include "Commands.h"
#include <Client/ReplxxLineReader.h>
#include <Client/ClientBase.h>
#include <Common/EventNotifier.h>
#include <Common/filesystemHelpers.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Parsers/parseQuery.h>
#include <boost/program_options.hpp>
@ -13,13 +15,6 @@ namespace fs = std::filesystem;
namespace DB
{
static const NameSet four_letter_word_commands
{
"ruok", "mntr", "srvr", "stat", "srst", "conf",
"cons", "crst", "envi", "dirs", "isro", "wchs",
"wchc", "wchp", "dump", "csnp", "lgif", "rqld",
};
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
@ -54,7 +49,7 @@ void KeeperClient::askConfirmation(const String & prompt, std::function<void()>
confirmation_callback = callback;
}
String KeeperClient::getAbsolutePath(const String & relative)
String KeeperClient::getAbsolutePath(const String & relative) const
{
String result;
if (relative.starts_with('/'))
@ -68,16 +63,20 @@ String KeeperClient::getAbsolutePath(const String & relative)
return result;
}
void KeeperClient::loadCommands(std::vector<std::tuple<String, size_t, Callback>> && new_commands)
void KeeperClient::loadCommands(std::vector<Command> && new_commands)
{
for (const auto & [name, args_count, callback] : new_commands)
std::vector<String> suggestions;
for (const auto & command : new_commands)
{
commands.insert({{name, args_count}, callback});
suggest.addWords({name});
String name = command->getName();
commands.insert({name, command});
suggestions.push_back(std::move(name));
}
for (const auto & command : four_letter_word_commands)
suggest.addWords({command});
suggestions.push_back(command);
suggest.addWords(std::move(suggestions));
}
void KeeperClient::defineOptions(Poco::Util::OptionSet & options)
@ -132,61 +131,15 @@ void KeeperClient::defineOptions(Poco::Util::OptionSet & options)
void KeeperClient::initialize(Poco::Util::Application & /* self */)
{
loadCommands({
{"set", 2, [](KeeperClient * client, const std::vector<String> & args)
{
client->zookeeper->set(client->getAbsolutePath(args[1]), args[2]);
}},
{"create", 2, [](KeeperClient * client, const std::vector<String> & args)
{
client->zookeeper->create(client->getAbsolutePath(args[1]), args[2], zkutil::CreateMode::Persistent);
}},
{"get", 1, [](KeeperClient * client, const std::vector<String> & args)
{
std::cout << client->zookeeper->get(client->getAbsolutePath(args[1])) << "\n";
}},
{"ls", 0, [](KeeperClient * client, const std::vector<String> & /* args */)
{
auto children = client->zookeeper->getChildren(client->cwd);
for (auto & child : children)
std::cout << child << " ";
std::cout << "\n";
}},
{"ls", 1, [](KeeperClient * client, const std::vector<String> & args)
{
auto children = client->zookeeper->getChildren(client->getAbsolutePath(args[1]));
for (auto & child : children)
std::cout << child << " ";
std::cout << "\n";
}},
{"cd", 0, [](KeeperClient * /* client */, const std::vector<String> & /* args */)
{
}},
{"cd", 1, [](KeeperClient * client, const std::vector<String> & args)
{
auto new_path = client->getAbsolutePath(args[1]);
if (!client->zookeeper->exists(new_path))
std::cerr << "Path " << new_path << " does not exists\n";
else
client->cwd = new_path;
}},
{"rm", 1, [](KeeperClient * client, const std::vector<String> & args)
{
client->zookeeper->remove(client->getAbsolutePath(args[1]));
}},
{"rmr", 1, [](KeeperClient * client, const std::vector<String> & args)
{
String path = client->getAbsolutePath(args[1]);
client->askConfirmation("You are going to recursively delete path " + path,
[client, path]{ client->zookeeper->removeRecursive(path); });
}},
std::make_shared<LSCommand>(),
std::make_shared<CDCommand>(),
std::make_shared<SetCommand>(),
std::make_shared<CreateCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<HelpCommand>(),
std::make_shared<FourLetterWordCommand>(),
});
String home_path;
@ -234,32 +187,31 @@ bool KeeperClient::processQueryText(const String & text)
if (exit_strings.find(text) != exit_strings.end())
return false;
std::vector<String> tokens;
boost::algorithm::split(tokens, text, boost::is_any_of(" "));
try
{
if (need_confirmation)
{
need_confirmation = false;
if (tokens.size() == 1 && (tokens[0] == "y" || tokens[0] == "Y"))
if (text.size() == 1 && (text == "y" || text == "Y"))
confirmation_callback();
return true;
}
else if (tokens.size() == 1 && tokens[0].size() == 4 && four_letter_word_commands.find(tokens[0]) != four_letter_word_commands.end())
std::cout << executeFourLetterCommand(tokens[0]) << "\n";
else
KeeperParser parser;
String message;
const char * begin = text.data();
ASTPtr res = tryParseQuery(parser, begin, begin + text.size(), message, true, "", false, 0, 0, false);
if (!res)
{
auto callback = commands.find({tokens[0], tokens.size() - 1});
if (callback == commands.end())
{
if (tokens[0].size() == 4 && tokens.size() == 1) /// Treat it like unrecognized four-letter command
std::cout << executeFourLetterCommand(tokens[0]) << "\n";
else
std::cerr << "No command found with name " << tokens[0] << " and args count " << tokens.size() - 1 << "\n";
}
else
callback->second(this, tokens);
std::cerr << message << "\n";
return true;
}
auto * query = res->as<ASTKeeperQuery>();
auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
}
catch (Coordination::Exception & err)
{

View File

@ -1,10 +1,12 @@
#pragma once
#include "Parser.h"
#include "Commands.h"
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Client/LineReader.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <Parsers/ASTLiteral.h>
#include <Poco/Net/StreamSocket.h>
#include <Poco/Util/Application.h>
#include <filesystem>
@ -13,13 +15,16 @@
namespace DB
{
class KeeperClient;
static const NameSet four_letter_word_commands
{
"ruok", "mntr", "srvr", "stat", "srst", "conf",
"cons", "crst", "envi", "dirs", "isro", "wchs",
"wchc", "wchp", "dump", "csnp", "lgif", "rqld",
};
class KeeperClient: public Poco::Util::Application
{
public:
using Callback = std::function<void(KeeperClient *, const std::vector<String> &)>;
KeeperClient() = default;
void initialize(Poco::Util::Application & self) override;
@ -28,29 +33,31 @@ public:
void defineOptions(Poco::Util::OptionSet & options) override;
protected:
void runInteractive();
void loadCommands(std::vector<std::tuple<String, size_t, Callback>> && new_commands);
bool processQueryText(const String & text);
void executeQuery(const String & query);
String getAbsolutePath(const String & relative) const;
void askConfirmation(const String & prompt, std::function<void()> && callback);
String executeFourLetterCommand(const String & command);
String getAbsolutePath(const String & relative);
void askConfirmation(const String & prompt, std::function<void()> && callback);
zkutil::ZooKeeperPtr zookeeper;
std::filesystem::path cwd = "/";
std::function<void()> confirmation_callback;
std::map<std::pair<String, size_t>, Callback> commands;
inline static std::map<String, Command> commands;
protected:
void runInteractive();
bool processQueryText(const String & text);
void executeQuery(const String & query);
void loadCommands(std::vector<Command> && new_commands);
String history_file;
LineReader::Suggest suggest;
zkutil::ZooKeeperPtr zookeeper;
zkutil::ZooKeeperArgs zk_args;
std::filesystem::path cwd = "/";
bool need_confirmation = false;
std::function<void()> confirmation_callback;
};
}

View File

@ -0,0 +1,94 @@
#include "Parser.h"
#include "KeeperClient.h"
namespace DB
{
bool parseKeeperArg(IParser::Pos & pos, Expected & expected, String & result)
{
expected.add(pos, getTokenName(TokenType::BareWord));
if (pos->type == TokenType::BareWord)
{
result = String(pos->begin, pos->end);
++pos;
ParserToken{TokenType::Whitespace}.ignore(pos);
return true;
}
bool status = parseIdentifierOrStringLiteral(pos, expected, result);
ParserToken{TokenType::Whitespace}.ignore(pos);
return status;
}
bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path)
{
expected.add(pos, "path");
if (pos->type == TokenType::QuotedIdentifier || pos->type == TokenType::StringLiteral)
return parseIdentifierOrStringLiteral(pos, expected, path);
String result;
while (pos->type == TokenType::BareWord || pos->type == TokenType::Slash || pos->type == TokenType::Dot)
{
result.append(pos->begin, pos->end);
++pos;
}
ParserToken{TokenType::Whitespace}.ignore(pos);
if (result.empty())
return false;
path = result;
return true;
}
bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTKeeperQuery>();
for (const auto & pair : KeeperClient::commands)
expected.add(pos, pair.first.data());
for (const auto & flwc : four_letter_word_commands)
expected.add(pos, flwc.data());
if (pos->type != TokenType::BareWord)
return false;
String command_name(pos->begin, pos->end);
Command command;
auto iter = KeeperClient::commands.find(command_name);
if (iter == KeeperClient::commands.end())
{
if (command_name.size() == 4)
{
/// Treat it like four-letter command
/// Since keeper server can potentially have different version we don't want to match this command with embedded list
command = std::make_shared<FourLetterWordCommand>();
command_name = command->getName();
/// We also don't move the position, so the command will be parsed as an argument
}
else
return false;
}
else
{
command = iter->second;
++pos;
ParserToken{TokenType::Whitespace}.ignore(pos);
}
query->command = command_name;
if (!command->parse(pos, query, expected))
return false;
ParserToken{TokenType::Whitespace}.ignore(pos);
node = query;
return true;
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/IAST.h>
#include <Parsers/IParserBase.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
namespace DB
{
bool parseKeeperArg(IParser::Pos & pos, Expected & expected, String & result);
bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path);
class ASTKeeperQuery : public IAST
{
public:
String getID(char) const override { return "KeeperQuery"; }
ASTPtr clone() const override { return std::make_shared<ASTKeeperQuery>(*this); }
String command;
std::vector<Field> args;
};
class KeeperParser : public IParserBase
{
protected:
const char * getName() const override { return "Keeper client query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -4,7 +4,7 @@
namespace DB
{
Tokens::Tokens(const char * begin, const char * end, size_t max_query_size)
Tokens::Tokens(const char * begin, const char * end, size_t max_query_size, bool skipp_insignificant)
{
Lexer lexer(begin, end, max_query_size);
@ -13,7 +13,7 @@ Tokens::Tokens(const char * begin, const char * end, size_t max_query_size)
{
Token token = lexer.nextToken();
stop = token.isEnd() || token.type == TokenType::ErrorMaxQuerySizeExceeded;
if (token.isSignificant())
if (token.isSignificant() || (!skipp_insignificant && !data.empty() && data.back().isSignificant()))
data.emplace_back(std::move(token));
} while (!stop);
}

View File

@ -24,7 +24,7 @@ private:
std::size_t last_accessed_index = 0;
public:
Tokens(const char * begin, const char * end, size_t max_query_size = 0);
Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skipp_insignificant = true);
ALWAYS_INLINE inline const Token & operator[](size_t index)
{

View File

@ -233,10 +233,11 @@ ASTPtr tryParseQuery(
const std::string & query_description,
bool allow_multi_statements,
size_t max_query_size,
size_t max_parser_depth)
size_t max_parser_depth,
bool skipp_insignificant)
{
const char * query_begin = _out_query_end;
Tokens tokens(query_begin, all_queries_end, max_query_size);
Tokens tokens(query_begin, all_queries_end, max_query_size, skipp_insignificant);
/// NOTE: consider use UInt32 for max_parser_depth setting.
IParser::Pos token_iterator(tokens, static_cast<uint32_t>(max_parser_depth));

View File

@ -18,7 +18,8 @@ ASTPtr tryParseQuery(
bool allow_multi_statements, /// If false, check for non-space characters after semicolon and set error message if any.
size_t max_query_size, /// If (end - pos) > max_query_size and query is longer than max_query_size then throws "Max query size exceeded".
/// Disabled if zero. Is used in order to check query size if buffer can contains data for INSERT query.
size_t max_parser_depth);
size_t max_parser_depth,
bool skipp_insignificant = true); /// If true, lexer will skip all insignificant tokens (e.g. whitespaces)
/// Parse query or throw an exception with error message.