Add basic commands for disk tool (list-disks, list, move, remove, link, copy, read, write) + tests

This commit is contained in:
Varinara 2022-04-08 10:52:16 +03:00
parent e39e5adf74
commit ed6e8176fe
21 changed files with 1136 additions and 1 deletions

View File

@ -64,6 +64,8 @@ option (ENABLE_CLICKHOUSE_KEEPER_CONVERTER "Util allows to convert ZooKeeper log
option (ENABLE_CLICKHOUSE_SU "A tool similar to 'su'" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_DISKS "A tool to manage disks" ${ENABLE_CLICKHOUSE_ALL})
if (NOT ENABLE_NURAFT)
# RECONFIGURE_MESSAGE_LEVEL should not be used here,
# since ENABLE_NURAFT is set to OFF for FreeBSD and Darwin.
@ -175,6 +177,12 @@ if(NOT (USE_STATIC_LIBRARIES OR SPLIT_SHARED_LIBRARIES))
set(CLICKHOUSE_ONE_SHARED ON)
endif()
if (ENABLE_CLICKHOUSE_DISKS)
message(STATUS "Clickhouse disks mode: ON")
else()
message(STATUS "ClickHouse disks mode: OFF")
endif()
configure_file (config_tools.h.in ${ConfigIncludePath}/config_tools.h)
macro(clickhouse_target_link_split_lib target name)
@ -240,6 +248,7 @@ add_subdirectory (git-import)
add_subdirectory (bash-completion)
add_subdirectory (static-files-disk-uploader)
add_subdirectory (su)
add_subdirectory (disks)
if (ENABLE_CLICKHOUSE_KEEPER)
add_subdirectory (keeper)
@ -323,7 +332,8 @@ if (CLICKHOUSE_SPLIT_BINARY)
clickhouse-obfuscator
clickhouse-git-import
clickhouse-copier
clickhouse-static-files-disk-uploader)
clickhouse-static-files-disk-uploader
clickhouse-disks)
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-odbc-bridge)
@ -403,6 +413,9 @@ else ()
if (ENABLE_CLICKHOUSE_INSTALL)
clickhouse_target_link_split_lib(clickhouse install)
endif ()
if (ENABLE_CLICKHOUSE_DISKS)
clickhouse_target_link_split_lib(clickhouse disks)
endif ()
set (CLICKHOUSE_BUNDLE)
if (ENABLE_CLICKHOUSE_SERVER)
@ -482,6 +495,11 @@ else ()
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper-converter" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-keeper-converter)
endif ()
if (ENABLE_CLICKHOUSE_DISKS)
add_custom_target (clickhouse-disks ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-disks DEPENDS clickhouse)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-disks" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-disks)
endif ()
add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_BUNDLE})

View File

@ -20,3 +20,4 @@
#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER
#cmakedefine01 ENABLE_CLICKHOUSE_STATIC_FILES_DISK_UPLOADER
#cmakedefine01 ENABLE_CLICKHOUSE_SU
#cmakedefine01 ENABLE_CLICKHOUSE_DISKS

View File

@ -0,0 +1,14 @@
set (CLICKHOUSE_DISKS_SOURCES DisksApp.cpp ICommand.cpp)
set (CLICKHOUSE_DISKS_LINK
PRIVATE
boost::program_options
clickhouse_aggregate_functions
clickhouse_common_config
clickhouse_common_io
clickhouse_functions
clickhouse_parsers
clickhouse_table_functions
)
clickhouse_program_add(disks)

View File

@ -0,0 +1,70 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandCopy : public ICommand
{
public:
CommandCopy()
{
command_name = "copy";
command_option_description.emplace(createOptionsDescription("Help Message for copy", getTerminalWidth()));
description = "Recursively copy data containing at `from_path` to `to_path`\nPath should be in format './' or './path' or 'path'";
usage = "Usage: copy [OPTION]... <FROM_PATH> <TO_PATH>";
command_option_description->add_options()
("diskFrom", po::value<String>(), "set name for disk from which we do operations")
("diskTo", po::value<String>(), "set name for disk to which we do operations")
;
}
void processOptions(
Poco::Util::AbstractConfiguration & config,
po::variables_map & options) const override
{
if (options.count("diskFrom"))
config.setString("diskFrom", options["diskFrom"].as<String>());
if (options.count("diskTo"))
config.setString("diskTo", options["diskTo"].as<String>());
}
void executeImpl(
const ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const override
{
if (pos_arguments.size() != 2)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name_from = config.getString("diskFrom", config.getString("disk", "default"));
String disk_name_to = config.getString("diskTo", config.getString("disk", "default"));
String path_from = pos_arguments[0];
String path_to = pos_arguments[1];
DiskPtr disk_from = global_context->getDisk(disk_name_from);
DiskPtr disk_to = global_context->getDisk(disk_name_to);
String full_path_from = fullPathWithValidate(disk_from, path_from);
String full_path_to = fullPathWithValidate(disk_to, path_to);
disk_from->copy(full_path_from, disk_to, full_path_to);
}
};
}
std::unique_ptr <DB::ICommand> makeCommandCopy()
{
return std::make_unique<DB::CommandCopy>();
}

View File

@ -0,0 +1,56 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandLink : public ICommand
{
public:
CommandLink()
{
command_name = "link";
command_option_description.emplace(createOptionsDescription("Help Message for link", getTerminalWidth()));
description = "Create hardlink from `from_path` to `to_path`\nPath should be in format './' or './path' or 'path'";
usage = "Usage: link [OPTION]... <FROM_PATH> <TO_PATH>";
}
void processOptions(
Poco::Util::AbstractConfiguration &,
po::variables_map &) const override{}
void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const override
{
if (pos_arguments.size() != 2)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path_from = pos_arguments[0];
String path_to = pos_arguments[1];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path_from = fullPathWithValidate(disk, path_from);
String full_path_to = fullPathWithValidate(disk, path_to);
disk->createHardLink(full_path_from, full_path_to);
}
};
}
std::unique_ptr <DB::ICommand> makeCommandLink()
{
return std::make_unique<DB::CommandLink>();
}

View File

@ -0,0 +1,58 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandList : public ICommand
{
public:
CommandList()
{
command_name = "list";
command_option_description.emplace(createOptionsDescription("Help Message for list", getTerminalWidth()));
description = "List files (the default disk is used by default)\nPath should be in format './' or './path' or 'path'";
usage = "Usage: list [OPTION]... <PATH>...";
}
void processOptions(
Poco::Util::AbstractConfiguration &,
po::variables_map &) const override{}
void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const override
{
if (pos_arguments.size() != 1)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path = pos_arguments[0];
std::vector<String> file_names;
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
disk->listFiles(full_path, file_names);
for (const auto & file_name : file_names)
std::cout << file_name << '\n';
}
};
}
std::unique_ptr <DB::ICommand> makeCommandList()
{
return std::make_unique<DB::CommandList>();
}

View File

@ -0,0 +1,49 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandListDisks : public ICommand
{
public:
CommandListDisks()
{
command_name = "list-disks";
command_option_description.emplace(createOptionsDescription("Help Message for list-disks", getTerminalWidth()));
description = "List disks names";
usage = "Usage: list-disks [OPTION]";
}
void processOptions(
Poco::Util::AbstractConfiguration &,
po::variables_map &) const override{}
void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration &) const override
{
if (pos_arguments.size() != 0)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
std::vector<String> disks_names;
for (const auto & [disk_name, _] : global_context->getDisksMap())
std::cout << disk_name << '\n';
}
};
}
std::unique_ptr <DB::ICommand> makeCommandListDisks()
{
return std::make_unique<DB::CommandListDisks>();
}

View File

@ -0,0 +1,59 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandMove : public ICommand
{
public:
CommandMove()
{
command_name = "move";
command_option_description.emplace(createOptionsDescription("Help Message for move", getTerminalWidth()));
description = "Move file or directory from `from_path` to `to_path`\nPath should be in format './' or './path' or 'path'";
usage = "Usage: move [OPTION]... <FROM_PATH> <TO_PATH>";
}
void processOptions(
Poco::Util::AbstractConfiguration &,
po::variables_map &) const override{}
void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const override
{
if (pos_arguments.size() != 2)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path_from = pos_arguments[0];
String path_to = pos_arguments[1];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path_from = fullPathWithValidate(disk, path_from);
String full_path_to = fullPathWithValidate(disk, path_to);
if (disk->isFile(full_path_from))
disk->moveFile(full_path_from, full_path_to);
else
disk->moveDirectory(full_path_from, full_path_to);
}
};
}
std::unique_ptr <DB::ICommand> makeCommandMove()
{
return std::make_unique<DB::CommandMove>();
}

View File

@ -0,0 +1,77 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandRead : public ICommand
{
public:
CommandRead()
{
command_name = "read";
command_option_description.emplace(createOptionsDescription("Help Message for read", getTerminalWidth()));
description = "read File `from_path` to `to_path` or to stdout\nPath should be in format './' or './path' or 'path'";
usage = "Usage: read [OPTION]... <FROM_PATH> <TO_PATH>\nor\nread [OPTION]... <FROM_PATH>";
command_option_description->add_options()
("output", po::value<String>(), "set path to file to which we are read")
;
}
void processOptions(
Poco::Util::AbstractConfiguration & config,
po::variables_map & options) const override
{
if (options.count("output"))
config.setString("output", options["output"].as<String>());
}
void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const override
{
if (pos_arguments.size() != 1)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path = pos_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
String path_output = config.getString("output", "");
std::unique_ptr<ReadBufferFromFileBase> in = disk->readFile(full_path);
std::unique_ptr<WriteBufferFromFileBase> out;
if (path_output.empty())
{
out = std::make_unique<WriteBufferFromFileDescriptor>(STDOUT_FILENO);
}
else
{
String full_path_output = fullPathWithValidate(disk, path_output);
out = disk->writeFile(full_path_output);
}
copyData(*in, *out);
out->finalize();
}
};
}
std::unique_ptr <DB::ICommand> makeCommandRead()
{
return std::make_unique<DB::CommandRead>();
}

View File

@ -0,0 +1,54 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandRemove : public ICommand
{
public:
CommandRemove()
{
command_name = "remove";
command_option_description.emplace(createOptionsDescription("Help Message for remove", getTerminalWidth()));
description = "Remove file or directory with all children. Throws exception if file doesn't exists.\nPath should be in format './' or './path' or 'path'";
usage = "Usage: remove [OPTION]... <PATH>";
}
void processOptions(
Poco::Util::AbstractConfiguration &,
po::variables_map &) const override{}
void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const override
{
if (pos_arguments.size() != 1)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path = pos_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
disk->removeRecursive(full_path);
}
};
}
std::unique_ptr <DB::ICommand> makeCommandRemove()
{
return std::make_unique<DB::CommandRemove>();
}

View File

@ -0,0 +1,77 @@
#pragma once
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class CommandWrite : public ICommand
{
public:
CommandWrite()
{
command_name = "write";
command_option_description.emplace(createOptionsDescription("Help Message for write", getTerminalWidth()));
description = "Write File `from_path` or stdin to `to_path`";
usage = "Usage: write [OPTION]... <FROM_PATH> <TO_PATH>\nor\nstdin | write [OPTION]... <TO_PATH>\nPath should be in format './' or './path' or 'path'";
command_option_description->add_options()
("input", po::value<String>(), "set path to file to which we are write")
;
}
void processOptions(
Poco::Util::AbstractConfiguration & config,
po::variables_map & options) const override
{
if (options.count("input"))
config.setString("input", options["input"].as<String>());
}
void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const override
{
if (pos_arguments.size() != 1)
{
printHelpMessage();
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
String disk_name = config.getString("disk", "default");
String path = pos_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
String full_path = fullPathWithValidate(disk, path);
String path_input = config.getString("input", "");
std::unique_ptr<ReadBufferFromFileBase> in;
std::unique_ptr<WriteBufferFromFileBase> out = disk->writeFile(full_path);
if (path_input.empty())
{
in = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
}
else
{
String full_path_input = fullPathWithValidate(disk, path_input);
in = disk->readFile(full_path_input);
}
copyData(*in, *out);
out->finalize();
}
};
}
std::unique_ptr <DB::ICommand> makeCommandWrite()
{
return std::make_unique<DB::CommandWrite>();
}

181
programs/disks/DisksApp.cpp Normal file
View File

@ -0,0 +1,181 @@
#include "DisksApp.h"
#include <Disks/registerDisks.h>
#include <base/argsToConfig.h>
#include <Formats/registerFormats.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
size_t DisksApp::findCommandPos(std::vector<String> & common_arguments)
{
for (size_t i = 0; i < common_arguments.size(); i++)
if (supported_commands.contains(common_arguments[i]))
return i + 1;
return common_arguments.size();
}
void DisksApp::printHelpMessage(std::optional<ProgramOptionsDescription> & command_option_description)
{
std::optional<ProgramOptionsDescription> help_description = createOptionsDescription("Help Message for clickhouse-disks", getTerminalWidth());
help_description->add(command_option_description.value());
std::cout << "ClickHouse disk management tool\n";
std::cout << "Usage: ./clickhouse-disks [OPTION]\n";
std::cout << "clickhouse-disks\n\n";
for (const auto & command : supported_commands)
std::cout << command_descriptions[command]->command_name << "\t" << command_descriptions[command]->description << "\n\n";
std::cout << command_option_description.value() << '\n';
}
String DisksApp::getDefaultConfigFileName()
{
return "/etc/clickhouse-server/config.xml";
}
void DisksApp::loadConfiguration()
{
config_path = config().getString("config-file", getDefaultConfigFileName());
DB::ConfigProcessor config_processor(config_path, false, false);
config_processor.setConfigPath(fs::path(config_path).parent_path());
auto loaded_config = config_processor.loadConfig();
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}
void DisksApp::addOptions(std::optional<ProgramOptionsDescription> & options_description,
boost::program_options::positional_options_description & positional_options_description
)
{
options_description->add_options()
("help,h", "print common help message")
("disk", po::value<String>(), "set disk name")
("config-file,C", po::value<String>(), "set config file")
("command_name", po::value<String>(&command_name), "name for command to do")
;
positional_options_description.add("command_name", 1);
supported_commands = {"list-disks", "list", "move", "remove", "link", "copy", "write", "read"};
command_descriptions.emplace("list-disks", makeCommandListDisks());
command_descriptions.emplace("list", makeCommandList());
command_descriptions.emplace("move", makeCommandMove());
command_descriptions.emplace("remove", makeCommandRemove());
command_descriptions.emplace("link", makeCommandLink());
command_descriptions.emplace("copy", makeCommandCopy());
command_descriptions.emplace("write", makeCommandWrite());
command_descriptions.emplace("read", makeCommandRead());
}
void DisksApp::processOptions()
{
if (options.count("config-file"))
config().setString("config-file", options["config-file"].as<String>());
if (options.count("disk"))
config().setString("disk", options["disk"].as<String>());
}
void DisksApp::init(std::vector<String> & common_arguments)
{
stopOptionsProcessing();
std::optional<ProgramOptionsDescription> options_description;
options_description.emplace(createOptionsDescription("clickhouse-disks", getTerminalWidth()));
po::positional_options_description positional_options_description;
addOptions(options_description, positional_options_description);
size_t command_pos = findCommandPos(common_arguments);
std::vector<String> global_flags(command_pos);
command_flags.resize(common_arguments.size() - command_pos);
copy(common_arguments.begin(), common_arguments.begin() + command_pos, global_flags.begin());
copy(common_arguments.begin() + command_pos, common_arguments.end(), command_flags.begin());
parseAndCheckOptions(options_description, positional_options_description, global_flags);
po::notify(options);
if (options.count("help"))
{
printHelpMessage(options_description);
exit(0);
}
if (!supported_commands.contains(command_name))
{
printHelpMessage(options_description);
throw DB::Exception("Bad Arguments", DB::ErrorCodes::BAD_ARGUMENTS);
}
processOptions();
}
void DisksApp::parseAndCheckOptions(std::optional<ProgramOptionsDescription> & options_description,
boost::program_options::positional_options_description & positional_options_description,
std::vector<String> & arguments)
{
auto parser = po::command_line_parser(arguments).options(options_description.value()).positional(positional_options_description).allow_unregistered();
po::parsed_options parsed = parser.run();
po::store(parsed, options);
}
int DisksApp::main(const std::vector<String> & /*args*/)
{
Poco::Logger::root().setLevel("trace");
Poco::Logger::root().setChannel(new Poco::FileChannel(config().getString("logger.clickhouse-disks", "/var/log/clickhouse-server/clickhouse-disks.log")));
loadConfiguration();
String path = config().getString("path", DBMS_DEFAULT_PATH);
registerDisks();
registerFormats();
shared_context = Context::createShared();
global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::DISKS);
global_context->setPath(path);
command_descriptions[command_name]->execute(command_flags, global_context, config(), options);
return Application::EXIT_OK;
}
}
int mainEntryClickHouseDisks(int argc, char ** argv)
{
try
{
DB::DisksApp app;
std::vector<String> common_arguments{argv + 1, argv + argc};
app.init(common_arguments);
return app.run();
}
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
return 1;
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return DB::ErrorCodes::BAD_ARGUMENTS;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
}
}

72
programs/disks/DisksApp.h Normal file
View File

@ -0,0 +1,72 @@
#pragma once
#include "CommandCopy.cpp"
#include "CommandLink.cpp"
#include "CommandList.cpp"
#include "CommandListDisks.cpp"
#include "CommandMove.cpp"
#include "CommandRead.cpp"
#include "CommandRemove.cpp"
#include "CommandWrite.cpp"
#include <Common/Config/ConfigProcessor.h>
#include <Loggers/Loggers.h>
#include <Client/ClientBase.h>
#include <Client/LocalConnection.h>
#include <Common/ProgressIndication.h>
#include <Common/StatusFile.h>
#include <Common/InterruptListener.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
namespace DB
{
using CommandPtr = std::unique_ptr<ICommand>;
class DisksApp : public Poco::Util::Application, public Loggers
{
public:
DisksApp() = default;
void init(std::vector<String> & common_arguments);
int main(const std::vector<String> & args) override;
protected:
void loadConfiguration();
static String getDefaultConfigFileName();
void addOptions(
std::optional<ProgramOptionsDescription> & options_description,
boost::program_options::positional_options_description & positional_options_description);
void processOptions();
void printHelpMessage(std::optional<ProgramOptionsDescription> & command_option_description);
size_t findCommandPos(std::vector<String> & common_arguments);
private:
void parseAndCheckOptions(
std::optional<ProgramOptionsDescription> & options_description,
boost::program_options::positional_options_description & positional_options_description,
std::vector<String> & arguments);
protected:
String config_path;
ContextMutablePtr global_context;
SharedContextHolder shared_context;
String command_name;
std::vector<String> command_flags;
std::unordered_set<String> supported_commands;
std::unordered_map<String, CommandPtr> command_descriptions;
po::variables_map options;
};
}

View File

@ -0,0 +1,55 @@
#include "ICommand.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void ICommand::printHelpMessage() const
{
std::cout << command_name << '\n';
std::cout << description << '\n';
std::cout << usage << '\n';
std::cout << command_option_description.value() << '\n';
}
String ICommand::fullPathWithValidate(const DiskPtr & disk, const String & path)
{
String full_path = (fs::absolute(disk->getPath()) / path).lexically_normal();
String disk_path = fs::path(disk->getPath());
if (!full_path.starts_with(disk_path))
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Path {} must be inside disk path {}", path, disk->getPath());
return full_path;
}
void ICommand::execute(
const std::vector<String> & command_arguments,
const DB::ContextMutablePtr & global_context,
Poco::Util::AbstractConfiguration & config,
po::variables_map & options)
{
command_option_description->add_options()
("help,h", "print help message for list")
("command_arguments", po::value<std::vector<String>>(&pos_arguments), "command arguments for command")
;
positional_options_description.add("command_arguments", -1);
auto parser = po::command_line_parser(command_arguments).options(command_option_description.value()).positional(positional_options_description).allow_unregistered();
po::parsed_options parsed = parser.run();
po::store(parsed, options);
po::notify(options);
if (options.count("help"))
{
printHelpMessage();
exit(0);
}
processOptions(config, options);
executeImpl(global_context, config);
}
}

63
programs/disks/ICommand.h Normal file
View File

@ -0,0 +1,63 @@
#pragma once
#include <Disks/IDisk.h>
#include <Poco/Util/Application.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/copyData.h>
#include <boost/program_options.hpp>
#include <Common/TerminalSize.h>
#include <memory>
namespace DB
{
namespace po = boost::program_options;
using ProgramOptionsDescription = boost::program_options::options_description;
using CommandLineOptions = boost::program_options::variables_map;
class ICommand
{
public:
ICommand() = default;
virtual ~ICommand() = default;
void execute(
const std::vector<String> & command_arguments,
const DB::ContextMutablePtr & global_context,
Poco::Util::AbstractConfiguration & config,
po::variables_map & options);
protected:
virtual void processOptions(
Poco::Util::AbstractConfiguration & config,
po::variables_map & options) const = 0;
virtual void executeImpl(
const DB::ContextMutablePtr & global_context,
const Poco::Util::AbstractConfiguration & config) const = 0;
void printHelpMessage() const;
static String fullPathWithValidate(const DiskPtr & disk, const String & path);
public:
String command_name;
String description;
protected:
std::optional<ProgramOptionsDescription> command_option_description;
String usage;
po::positional_options_description positional_options_description;
std::vector<String> pos_arguments;
};
}
std::unique_ptr <DB::ICommand> makeCommandCopy();
std::unique_ptr <DB::ICommand> makeCommandLink();
std::unique_ptr <DB::ICommand> makeCommandList();
std::unique_ptr <DB::ICommand> makeCommandListDisks();
std::unique_ptr <DB::ICommand> makeCommandMove();
std::unique_ptr <DB::ICommand> makeCommandRead();
std::unique_ptr <DB::ICommand> makeCommandRemove();
std::unique_ptr <DB::ICommand> makeCommandWrite();

View File

@ -0,0 +1,2 @@
int mainEntryClickHouseDisks(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseDisks(argc_, argv_); }

View File

@ -75,6 +75,9 @@ int mainEntryClickHouseStop(int argc, char ** argv);
int mainEntryClickHouseStatus(int argc, char ** argv);
int mainEntryClickHouseRestart(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_DISKS
int mainEntryClickHouseDisks(int argc, char ** argv);
#endif
int mainEntryClickHouseHashBinary(int, char **)
{
@ -144,6 +147,9 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
{"su", mainEntryClickHouseSU},
#endif
{"hash-binary", mainEntryClickHouseHashBinary},
#if ENABLE_CLICKHOUSE_DISKS
{"disks", mainEntryClickHouseDisks},
#endif
};
int printHelp(int, char **)

View File

@ -917,6 +917,7 @@ public:
CLIENT, /// clickhouse-client
LOCAL, /// clickhouse-local
KEEPER, /// clickhouse-keeper (also daemon)
DISKS, /// clickhouse-disks
};
ApplicationType getApplicationType() const;

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<clickhouse>
<path>/var/lib/clickhouse/</path>
<storage_configuration>
<disks>
<test1>
<type>local</type>
<path>/var/lib/clickhouse/path1/</path>
</test1>
<test2>
<type>local</type>
<path>/var/lib/clickhouse/path2/</path>
</test2>
</disks>
<policies>
<test1>
<volumes>
<main>
<disk>test1</disk>
</main>
</volumes>
</test1>
<test2>
<volumes>
<main>
<disk>test2</disk>
</main>
</volumes>
</test2>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,190 @@
from helpers.cluster import ClickHouseCluster
import pytest
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"disks_app_test",
main_configs=["config.xml"],
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def init_data(source):
source.query("DROP TABLE IF EXISTS test_table")
source.query(
"CREATE TABLE test_table(word String, value UInt64) "
"ENGINE=MergeTree() "
"ORDER BY word SETTINGS storage_policy = 'test1'"
)
source.query("INSERT INTO test_table(*) VALUES ('test1', 2)")
def test_disks_app_func_ld(started_cluster):
source = cluster.instances["disks_app_test"]
out = source.exec_in_container(["/usr/bin/clickhouse", "disks", "list-disks"])
disks = out.split("\n")
assert disks[0] == "default" and disks[1] == "test1" and disks[2] == "test2"
def test_disks_app_func_ls(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "list", "."]
)
files = out.split("\n")
assert files[0] == "store"
def test_disks_app_func_cp(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"copy",
"--diskFrom",
"test1",
"--diskTo",
"test2",
".",
".",
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test2", "list", "."]
)
files = out.split("\n")
assert files[0] == "path1"
def test_disks_app_func_ln(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"link",
"data/default/test_table",
"data/default/z_tester",
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "list", "data/default/"]
)
files = out.split("\n")
assert "z_tester" in files
def test_disks_app_func_rm(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"copy",
"--diskFrom",
"test1",
"--diskTo",
"test2",
".",
".",
]
)
source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test2", "remove", "path1"]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test2", "list", "."]
)
files = out.split("\n")
assert files[0] == ""
def test_disks_app_func_mv(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"--disk",
"test1",
"move",
"store",
"old_store",
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "list", "."]
)
files = out.split("\n")
assert files[0] == "old_store"
def test_disks_app_func_read_write(started_cluster):
source = cluster.instances["disks_app_test"]
source.exec_in_container(
[
"bash",
"-c",
"echo 'tester' |"
+ " ".join(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "write", "5.txt"]
),
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "read", "5.txt"]
)
files = out.split("\n")
assert files[0] == "tester"