ClickHouse/dbms/src/Client/Client.cpp

976 lines
29 KiB
C++
Raw Normal View History

#define DBMS_CLIENT 1 /// Используется в Context.h
2012-05-08 05:42:05 +00:00
#include <unistd.h>
2012-03-25 03:47:13 +00:00
#include <stdlib.h>
2012-03-26 02:48:08 +00:00
#include <fcntl.h>
2012-05-09 08:16:09 +00:00
#include <signal.h>
2012-03-25 03:47:13 +00:00
#include <readline/readline.h>
2012-03-25 07:52:31 +00:00
#include <readline/history.h>
2012-03-25 03:47:13 +00:00
#include <iostream>
2012-03-26 02:48:08 +00:00
#include <fstream>
#include <iomanip>
2012-03-25 03:47:13 +00:00
2014-01-08 16:33:28 +00:00
#include <unordered_set>
2012-03-25 03:47:13 +00:00
#include <boost/assign/list_inserter.hpp>
#include <boost/program_options.hpp>
2012-03-25 03:47:13 +00:00
#include <Poco/File.h>
#include <Poco/SharedPtr.h>
#include <Poco/Util/Application.h>
#include <Yandex/Revision.h>
2012-03-26 02:48:08 +00:00
#include <statdaemons/Stopwatch.h>
2012-03-25 03:47:13 +00:00
#include <DB/Core/Exception.h>
#include <DB/Core/Types.h>
2012-05-09 13:12:38 +00:00
#include <DB/Core/QueryProcessingStage.h>
2012-03-25 03:47:13 +00:00
2012-05-08 11:19:00 +00:00
#include <DB/IO/ReadBufferFromFileDescriptor.h>
2012-03-25 07:52:31 +00:00
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/IO/WriteBufferFromString.h>
2012-03-25 03:47:13 +00:00
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/copyData.h>
2012-03-25 03:47:13 +00:00
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
2012-03-26 04:17:17 +00:00
#include <DB/Parsers/ParserQuery.h>
#include <DB/Parsers/formatAST.h>
2012-03-25 03:47:13 +00:00
#include <DB/Interpreters/Context.h>
2012-05-16 18:03:00 +00:00
#include <DB/Client/Connection.h>
#include "InterruptListener.h"
2012-03-25 03:47:13 +00:00
/** Клиент командной строки СУБД ClickHouse.
*/
namespace DB
{
2012-03-25 07:52:31 +00:00
using Poco::SharedPtr;
class ExternalTable
{
public:
std::string file;
std::string name;
std::string format;
std::vector<std::pair<std::string, std::string> > structure;
void write()
{
std::cerr << "file " << file << std::endl;
std::cerr << "name " << name << std::endl;
std::cerr << "format " << format << std::endl;
std::cerr << "structure: \n";
for (size_t i = 0; i < structure.size(); ++i)
std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl;
}
ExternalTable(const boost::program_options::variables_map & external_options)
{
if (external_options.count("file"))
file = external_options["file"].as<std::string>();
else
throw Exception("--file field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("name"))
name = external_options["name"].as<std::string>();
else
throw Exception("--name field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("format"))
format = external_options["format"].as<std::string>();
else
throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("structure"))
{
std::vector<std::string> temp = external_options["structure"].as<std::vector<std::string>>();
std::string argument;
for (size_t i = 0; i < temp.size(); ++i)
argument = argument + temp[i] + " ";
std::vector<std::string> vals = split(argument, " ,");
if (vals.size() & 1)
throw Exception("Odd number of attributes in section structure", ErrorCodes::BAD_ARGUMENTS);
for (size_t i = 0; i < vals.size(); i += 2)
structure.push_back(std::make_pair(vals[i], vals[i+1]));
}
else if (external_options.count("types"))
{
std::vector<std::string> temp = external_options["types"].as<std::vector<std::string>>();
std::string argument;
for (size_t i = 0; i < temp.size(); ++i)
argument = argument + temp[i] + " ";
std::vector<std::string> vals = split(argument, " ,");
for (size_t i = 0; i < vals.size(); ++i)
structure.push_back(std::make_pair("_" + toString(i + 1), vals[i]));
}
else
throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
}
static std::vector<std::string> split(const std::string & s, const std::string &d)
{
std::vector<std::string> res;
std::string now;
for (size_t i = 0; i < s.size(); ++i)
{
if (d.find(s[i]) != std::string::npos)
{
if (!now.empty())
res.push_back(now);
now = "";
continue;
}
now += s[i];
}
if (!now.empty())
res.push_back(now);
return res;
}
};
2012-03-25 03:47:13 +00:00
class Client : public Poco::Util::Application
{
public:
Client() : is_interactive(true), stdin_is_not_tty(false),
2012-05-21 06:49:05 +00:00
format_max_block_size(0), std_in(STDIN_FILENO), std_out(STDOUT_FILENO), processed_rows(0),
2014-01-08 16:33:28 +00:00
rows_read_on_server(0), bytes_read_on_server(0), written_progress_chars(0), written_first_block(false)
{
}
2012-03-25 03:47:13 +00:00
private:
2014-01-08 16:33:28 +00:00
typedef std::unordered_set<String> StringSet;
2012-03-25 03:47:13 +00:00
StringSet exit_strings;
2012-05-08 05:42:05 +00:00
bool is_interactive; /// Использовать readline интерфейс или batch режим.
bool stdin_is_not_tty; /// stdin - не терминал.
2012-05-16 18:03:00 +00:00
SharedPtr<Connection> connection; /// Соединение с БД.
2012-03-25 03:47:13 +00:00
String query; /// Текущий запрос.
String format; /// Формат вывода результата в консоль.
size_t format_max_block_size; /// Максимальный размер блока при выводе в консоль.
2012-05-08 11:19:00 +00:00
String insert_format; /// Формат данных для INSERT-а при чтении их из stdin в batch режиме
size_t insert_format_max_block_size; /// Максимальный размер блока при чтении данных INSERT-а.
2012-03-25 03:47:13 +00:00
Context context;
2012-05-08 11:19:00 +00:00
/// Чтение из stdin для batch режима
ReadBufferFromFileDescriptor std_in;
2012-03-25 03:47:13 +00:00
/// Вывод в консоль
2012-03-25 07:52:31 +00:00
WriteBufferFromFileDescriptor std_out;
2012-03-25 03:47:13 +00:00
BlockOutputStreamPtr block_std_out;
2012-03-26 02:48:08 +00:00
String home_path;
/// Путь к файлу истории команд.
String history_file;
2012-03-26 04:17:17 +00:00
2012-05-21 06:49:05 +00:00
/// Строк прочитано или записано.
size_t processed_rows;
2012-03-26 04:17:17 +00:00
/// Распарсенный запрос. Оттуда берутся некоторые настройки (формат).
ASTPtr parsed_query;
/// Последнее полученное от сервера исключение.
ExceptionPtr last_exception;
2012-05-09 08:16:09 +00:00
2012-05-10 07:47:13 +00:00
Stopwatch watch;
2012-05-09 15:50:42 +00:00
size_t rows_read_on_server;
size_t bytes_read_on_server;
2012-05-09 15:15:45 +00:00
size_t written_progress_chars;
2012-05-09 16:34:41 +00:00
bool written_first_block;
2012-05-09 15:15:45 +00:00
/// Информация о внешних таблицах
std::vector<ExternalTable> external_tables;
2012-03-25 03:47:13 +00:00
void initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
boost::assign::insert(exit_strings)
("exit")("quit")("logout")
("учше")("йгше")("дщпщге")
("exit;")("quit;")("logout;")
2012-03-25 07:52:31 +00:00
("учше;")("йгше;")("дщпщге;")
("q")("й")("\\q")("\\Q");
2012-03-25 03:47:13 +00:00
2012-03-26 02:48:08 +00:00
const char * home_path_cstr = getenv("HOME");
if (home_path_cstr)
2012-03-26 02:48:08 +00:00
home_path = home_path_cstr;
2012-03-25 03:47:13 +00:00
if (config().has("config-file"))
loadConfiguration(config().getString("config-file"));
else if (Poco::File("./clickhouse-client.xml").exists())
loadConfiguration("./clickhouse-client.xml");
else if (!home_path.empty() && Poco::File(home_path + "/.clickhouse-client/config.xml").exists())
2012-03-26 02:48:08 +00:00
loadConfiguration(home_path + "/.clickhouse-client/config.xml");
2012-03-25 03:47:13 +00:00
else if (Poco::File("/etc/clickhouse-client/config.xml").exists())
loadConfiguration("/etc/clickhouse-client/config.xml");
}
2012-03-26 02:48:08 +00:00
2012-03-25 03:47:13 +00:00
int main(const std::vector<std::string> & args)
{
2012-03-26 02:48:08 +00:00
try
{
return mainImpl(args);
}
catch (const Exception & e)
2012-03-26 02:48:08 +00:00
{
2013-08-10 09:04:45 +00:00
std::string text = e.displayText();
std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl;
/// Если есть стек-трейс на сервере, то не будем писать стек-трейс на клиенте.
if (std::string::npos == text.find("Stack trace"))
std::cerr << "Stack trace:" << std::endl
<< e.getStackTrace().toString();
return e.code();
2012-03-26 02:48:08 +00:00
}
catch (const Poco::Exception & e)
{
std::cerr << "Poco::Exception: " << e.displayText() << std::endl;
return ErrorCodes::POCO_EXCEPTION;
2012-03-26 02:48:08 +00:00
}
catch (const std::exception & e)
{
std::cerr << "std::exception: " << e.what() << std::endl;
return ErrorCodes::STD_EXCEPTION;
2012-03-26 02:48:08 +00:00
}
catch (...)
{
std::cerr << "Unknown exception" << std::endl;
return ErrorCodes::UNKNOWN_EXCEPTION;
2012-03-26 02:48:08 +00:00
}
}
int mainImpl(const std::vector<std::string> & args)
{
2012-05-08 05:42:05 +00:00
/** Будем работать в batch режиме, если выполнено одно из следующих условий:
* - задан параметр -e (--query)
* (в этом случае - запрос или несколько запросов берём оттуда;
* а если при этом stdin не терминал, то берём оттуда данные для INSERT-а первого запроса).
* - stdin - не терминал (в этом случае, считываем оттуда запросы);
*/
stdin_is_not_tty = !isatty(STDIN_FILENO);
if (stdin_is_not_tty || config().has("query"))
is_interactive = false;
2012-05-09 08:16:09 +00:00
2012-03-26 04:17:17 +00:00
std::cout << std::fixed << std::setprecision(3);
2012-05-08 05:42:05 +00:00
std::cerr << std::fixed << std::setprecision(3);
2012-03-26 02:48:08 +00:00
2012-05-08 05:42:05 +00:00
if (is_interactive)
std::cout << "ClickHouse client version " << DBMS_VERSION_MAJOR
<< "." << DBMS_VERSION_MINOR
<< "." << Revision::get()
<< "." << std::endl;
2012-03-25 03:47:13 +00:00
2012-05-16 18:03:00 +00:00
format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated");
2012-03-25 03:47:13 +00:00
format_max_block_size = config().getInt("format_max_block_size", DEFAULT_BLOCK_SIZE);
2012-05-21 06:49:05 +00:00
insert_format = "Values";
insert_format_max_block_size = config().getInt("insert_format_max_block_size", format_max_block_size);
2012-05-16 18:03:00 +00:00
connect();
2012-05-08 05:42:05 +00:00
if (is_interactive)
2012-03-26 02:48:08 +00:00
{
2012-05-08 05:42:05 +00:00
/// Отключаем tab completion.
rl_bind_key('\t', rl_insert);
/// Загружаем историю команд, если есть.
if (config().has("history_file"))
history_file = config().getString("history_file");
else if (!home_path.empty())
history_file = home_path + "/.clickhouse-client-history";
if (!history_file.empty())
2012-05-08 05:42:05 +00:00
{
if (Poco::File(history_file).exists())
{
int res = read_history(history_file.c_str());
if (res)
throwFromErrno("Cannot read history from file " + history_file, ErrorCodes::CANNOT_READ_HISTORY);
}
else /// Создаём файл с историей.
Poco::File(history_file).createFile();
2012-05-08 05:42:05 +00:00
}
2012-08-31 18:44:05 +00:00
/// Инициализируем DateLUT, чтобы потраченное время не отображалось, как время, потраченное на запрос.
DateLUTSingleton::instance();
2012-08-31 18:44:05 +00:00
2012-05-08 05:42:05 +00:00
loop();
std::cout << "Bye." << std::endl;
return 0;
2012-03-26 02:48:08 +00:00
}
2012-05-08 05:42:05 +00:00
else
{
2012-05-08 05:42:05 +00:00
nonInteractive();
if (last_exception)
return last_exception->code();
return 0;
}
2012-03-25 03:47:13 +00:00
}
2012-05-09 13:12:38 +00:00
void connect()
{
String host = config().getString("host", "localhost");
2012-07-06 18:12:52 +00:00
UInt16 port = config().getInt("port", DBMS_DEFAULT_PORT);
2012-05-30 06:46:57 +00:00
String default_database = config().getString("database", "");
2013-08-10 09:04:45 +00:00
String user = config().getString("user", "");
String password = config().getString("password", "");
2012-05-30 06:46:57 +00:00
2012-05-16 18:03:00 +00:00
Protocol::Compression::Enum compression = config().getBool("compression", true)
? Protocol::Compression::Enable
: Protocol::Compression::Disable;
2012-05-09 13:12:38 +00:00
if (is_interactive)
2013-08-10 09:04:45 +00:00
std::cout << "Connecting to "
<< (!default_database.empty() ? "database " + default_database + " at " : "")
<< host << ":" << port
<< (!user.empty() ? " as user " + user : "")
<< "." << std::endl;
2012-05-09 13:12:38 +00:00
2013-08-10 09:04:45 +00:00
connection = new Connection(host, port, default_database, user, password, context.getDataTypeFactory(), "client", compression,
2012-07-26 20:16:57 +00:00
Poco::Timespan(config().getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
Poco::Timespan(config().getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
Poco::Timespan(config().getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
2012-05-16 18:20:45 +00:00
2012-05-21 06:49:05 +00:00
if (is_interactive)
{
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_revision = 0;
2012-05-16 18:20:45 +00:00
2012-05-21 06:49:05 +00:00
connection->getServerVersion(server_name, server_version_major, server_version_minor, server_revision);
2012-05-16 18:20:45 +00:00
std::cout << "Connected to " << server_name
<< " server version " << server_version_major
<< "." << server_version_minor
<< "." << server_revision
<< "." << std::endl << std::endl;
2012-05-21 06:49:05 +00:00
}
2012-05-09 13:12:38 +00:00
}
static bool isWhitespace(char c)
{
return c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == '\f';
}
2012-05-09 13:12:38 +00:00
2012-03-25 03:47:13 +00:00
void loop()
{
String query;
String prev_query;
while (char * line_ = readline(query.empty() ? ":) " : ":-] "))
2012-03-25 03:47:13 +00:00
{
String line = line_;
2012-03-25 03:47:13 +00:00
free(line_);
size_t ws = line.size();
while (ws > 0 && isWhitespace(line[ws - 1]))
--ws;
if (ws == 0 && query.empty())
2012-03-26 04:17:17 +00:00
continue;
bool ends_with_semicolon = line[ws - 1] == ';';
bool ends_with_backslash = line[ws - 1] == '\\';
2012-03-26 04:17:17 +00:00
if (ends_with_backslash)
line = line.substr(0, ws - 1);
query += line;
if (!ends_with_backslash && (ends_with_semicolon || !config().hasOption("multiline")))
{
if (query != prev_query)
{
add_history(query.c_str());
if (!history_file.empty() && append_history(1, history_file.c_str()))
throwFromErrno("Cannot append history to file " + history_file, ErrorCodes::CANNOT_APPEND_HISTORY);
prev_query = query;
}
try
{
if (!process(query))
break;
}
catch (const Exception & e)
{
std::cerr << std::endl
<< "Exception on client:" << std::endl
<< "Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl;
/** Эксепшен на клиенте в процессе обработки запроса может привести к рассинхронизации соединения.
* Установим соединение заново и позволим ввести следующий запрос.
*/
connect();
}
query = "";
}
else
{
query += '\n';
}
2012-03-25 03:47:13 +00:00
}
}
2012-05-08 05:42:05 +00:00
void nonInteractive()
{
if (config().has("query"))
process(config().getString("query"));
else
{
/** В случае, если параметр query не задан, то запрос будет читаться из stdin.
* При этом, запрос будет читаться не потоково (целиком в оперативку).
* Поддерживается только один запрос в stdin.
*/
String stdin_str;
{
ReadBufferFromFileDescriptor in(STDIN_FILENO);
WriteBufferFromString out(stdin_str);
copyData(in, out);
}
process(stdin_str);
2012-05-08 05:42:05 +00:00
}
}
2012-03-25 03:47:13 +00:00
bool process(const String & line)
{
if (exit_strings.end() != exit_strings.find(line))
return false;
2012-05-16 18:03:00 +00:00
block_std_out = NULL;
2012-05-10 07:47:13 +00:00
watch.restart();
2012-03-26 02:48:08 +00:00
2012-03-25 03:47:13 +00:00
query = line;
2012-03-26 04:17:17 +00:00
/// Некоторые части запроса выполняются на стороне клиента (форматирование результата). Поэтому, распарсим запрос.
if (!parseQuery())
return true;
2012-05-09 08:16:09 +00:00
2012-05-21 06:49:05 +00:00
processed_rows = 0;
2012-05-09 15:50:42 +00:00
rows_read_on_server = 0;
bytes_read_on_server = 0;
2012-05-09 16:34:41 +00:00
written_progress_chars = 0;
written_first_block = false;
2012-05-09 13:12:38 +00:00
2012-06-25 01:22:30 +00:00
/// Запрос INSERT (но только тот, что требует передачи данных - не INSERT SELECT), обрабатывается отдельным способом.
const ASTInsertQuery * insert = dynamic_cast<const ASTInsertQuery *>(&*parsed_query);
if (insert && !insert->select)
2012-05-21 06:49:05 +00:00
processInsertQuery();
else
processOrdinaryQuery();
2012-03-25 03:47:13 +00:00
2012-05-08 05:42:05 +00:00
if (is_interactive)
{
2012-05-08 05:42:05 +00:00
std::cout << std::endl
<< processed_rows << " rows in set. Elapsed: " << watch.elapsedSeconds() << " sec. ";
2013-11-03 05:12:04 +00:00
if (rows_read_on_server >= 1000)
writeFinalProgress();
std::cout << std::endl << std::endl;
}
2012-03-26 02:48:08 +00:00
2012-03-25 03:47:13 +00:00
return true;
}
2012-05-21 06:49:05 +00:00
/// Обработать запрос, который не требует передачи блоков данных на сервер.
void processOrdinaryQuery()
{
connection->sendQuery(query, "", QueryProcessingStage::Complete);
connection->sendTemporaryTables();
2012-05-21 06:49:05 +00:00
receiveResult();
}
/// Обработать запрос, который требует передачи блоков данных на сервер.
void processInsertQuery()
{
/// Отправляем часть запроса - без данных, так как данные будут отправлены отдельно.
const ASTInsertQuery & parsed_insert_query = dynamic_cast<const ASTInsertQuery &>(*parsed_query);
String query_without_data = parsed_insert_query.data
? query.substr(0, parsed_insert_query.data - query.data())
: query;
if ((is_interactive && !parsed_insert_query.data) || (stdin_is_not_tty && std_in.eof()))
2012-05-28 19:57:44 +00:00
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete);
connection->sendTemporaryTables();
2012-05-21 06:49:05 +00:00
/// Получим структуру таблицы
Block sample = receiveSampleBlock();
sendData(sample);
receivePacket();
}
2012-03-26 04:17:17 +00:00
bool parseQuery()
{
ParserQuery parser;
const char * expected = "";
2012-03-26 04:17:17 +00:00
const char * begin = query.data();
const char * end = begin + query.size();
const char * pos = begin;
bool parse_res = parser.parse(pos, end, parsed_query, expected);
/// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой.
if (!parse_res || (pos != end && *pos != ';'))
{
std::stringstream message;
message << "Syntax error: failed at position "
2012-03-26 04:17:17 +00:00
<< (pos - begin) << ": "
<< std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos))
<< ", expected " << (parse_res ? "end of query" : expected) << "."
<< std::endl << std::endl;
if (is_interactive)
std::cerr << message.str();
else
throw Exception(message.str(), ErrorCodes::SYNTAX_ERROR);
2012-03-26 04:17:17 +00:00
return false;
}
2012-05-08 05:42:05 +00:00
if (is_interactive)
{
std::cout << std::endl;
formatAST(*parsed_query, std::cout);
std::cout << std::endl << std::endl;
}
2012-03-26 04:17:17 +00:00
return true;
}
2012-05-21 06:49:05 +00:00
void sendData(Block & sample)
2012-05-08 11:19:00 +00:00
{
/// Если нужно отправить данные INSERT-а.
const ASTInsertQuery * parsed_insert_query = dynamic_cast<const ASTInsertQuery *>(&*parsed_query);
if (!parsed_insert_query)
return;
2012-05-28 19:34:55 +00:00
2012-05-08 11:19:00 +00:00
if (parsed_insert_query->data)
{
/// Отправляем данные из запроса.
2012-05-21 06:49:05 +00:00
ReadBuffer data_in(const_cast<char *>(parsed_insert_query->data), parsed_insert_query->end - parsed_insert_query->data, 0);
sendDataFrom(data_in, sample);
2012-05-08 11:19:00 +00:00
}
else if (!is_interactive)
2012-05-08 11:19:00 +00:00
{
/// Отправляем данные из stdin.
2012-05-21 06:49:05 +00:00
sendDataFrom(std_in, sample);
2012-05-08 11:19:00 +00:00
}
else
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
}
2012-05-21 06:49:05 +00:00
void sendDataFrom(ReadBuffer & buf, Block & sample)
2012-05-08 11:19:00 +00:00
{
2012-05-21 06:49:05 +00:00
String current_format = insert_format;
/// Формат может быть указан в INSERT запросе.
if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*parsed_query))
if (!insert->format.empty())
current_format = insert->format;
BlockInputStreamPtr block_std_in = new AsynchronousBlockInputStream(context.getFormatFactory().getInput(
current_format, buf, sample, insert_format_max_block_size, context.getDataTypeFactory()));
2012-05-21 06:49:05 +00:00
block_std_in->readPrefix();
while (true)
2012-05-08 11:19:00 +00:00
{
2012-05-21 06:49:05 +00:00
Block block = block_std_in->read();
connection->sendData(block);
processed_rows += block.rows();
if (!block)
break;
2012-05-08 11:19:00 +00:00
}
2012-05-21 06:49:05 +00:00
block_std_in->readSuffix();
2012-05-08 11:19:00 +00:00
}
2012-05-16 18:03:00 +00:00
/** Получает и обрабатывает пакеты из сервера.
* Также следит, не требуется ли прервать выполнение запроса.
*/
2012-03-25 03:47:13 +00:00
void receiveResult()
{
2012-05-09 08:16:09 +00:00
InterruptListener interrupt_listener;
bool cancelled = false;
while (true)
{
/** Проверим, не требуется ли остановить выполнение запроса (Ctrl+C).
* Если требуется - отправим об этом информацию на сервер.
* После чего, получим оставшиеся пакеты с сервера (чтобы не было рассинхронизации).
*/
if (!cancelled)
{
if (interrupt_listener.check())
{
2012-05-16 18:03:00 +00:00
connection->sendCancel();
2012-05-09 08:16:09 +00:00
cancelled = true;
if (is_interactive)
std::cout << "Cancelling query." << std::endl;
/// Повторное нажатие Ctrl+C приведёт к завершению работы.
interrupt_listener.unblock();
2012-05-09 08:16:09 +00:00
}
2012-05-16 18:03:00 +00:00
else if (!connection->poll(1000000))
2012-05-09 08:16:09 +00:00
continue; /// Если новых данных в ещё нет, то после таймаута продолжим проверять, не остановлено ли выполнение запроса.
}
2012-03-25 07:52:31 +00:00
2012-05-09 08:16:09 +00:00
if (!receivePacket())
break;
}
if (cancelled && is_interactive)
std::cout << "Query was cancelled." << std::endl;
2012-03-25 03:47:13 +00:00
}
2012-05-16 18:03:00 +00:00
/** Получить кусок результата или прогресс выполнения или эксепшен,
* и обработать пакет соответствующим образом.
* Возвращает true, если нужно продолжать чтение пакетов.
*/
2012-03-25 03:47:13 +00:00
bool receivePacket()
{
2012-05-16 18:03:00 +00:00
Connection::Packet packet = connection->receivePacket();
2012-03-25 03:47:13 +00:00
2012-05-16 18:03:00 +00:00
switch (packet.type)
2012-03-25 03:47:13 +00:00
{
case Protocol::Server::Data:
2012-05-16 18:03:00 +00:00
onData(packet.block);
return true;
case Protocol::Server::Progress:
onProgress(packet.progress);
return true;
2013-05-22 14:57:43 +00:00
case Protocol::Server::ProfileInfo:
onProfileInfo(packet.profile_info);
return true;
2012-03-25 03:47:13 +00:00
case Protocol::Server::Totals:
onTotals(packet.block);
return true;
case Protocol::Server::Extremes:
onExtremes(packet.block);
return true;
2012-05-08 05:42:05 +00:00
case Protocol::Server::Exception:
2012-05-16 18:03:00 +00:00
onException(*packet.exception);
last_exception = packet.exception;
2012-05-08 05:42:05 +00:00
return false;
2012-05-16 18:03:00 +00:00
case Protocol::Server::EndOfStream:
onEndOfStream();
2012-05-08 11:19:00 +00:00
return false;
2012-03-25 03:47:13 +00:00
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
2012-05-21 06:49:05 +00:00
/** Получить блок - пример структуры таблицы, в которую будут вставляться данные.
*/
Block receiveSampleBlock()
{
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
return packet.block;
default:
2012-06-19 22:46:02 +00:00
throw Exception("Unexpected packet from server (expected Data, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
2012-05-21 06:49:05 +00:00
}
}
2012-05-16 18:03:00 +00:00
void onData(Block & block)
2012-03-25 03:47:13 +00:00
{
2012-05-09 15:15:45 +00:00
if (written_progress_chars)
{
for (size_t i = 0; i < written_progress_chars; ++i)
std::cerr << "\b \b";
2012-05-09 16:34:41 +00:00
2012-05-09 15:15:45 +00:00
written_progress_chars = 0;
}
if (!block)
return;
2012-05-09 16:34:41 +00:00
processed_rows += block.rows();
if (!block_std_out)
2012-03-25 03:47:13 +00:00
{
String current_format = format;
2012-03-26 04:17:17 +00:00
/// Формат может быть указан в запросе.
if (ASTQueryWithOutput * query_with_output = dynamic_cast<ASTQueryWithOutput *>(&*parsed_query))
if (query_with_output->format)
if (ASTIdentifier * id = dynamic_cast<ASTIdentifier *>(&*query_with_output->format))
current_format = id->name;
block_std_out = context.getFormatFactory().getOutput(current_format, std_out, block);
block_std_out->writePrefix();
2012-03-25 03:47:13 +00:00
}
/// Загаловочный блок с нулем строк использовался для инициализации block_std_out,
/// выводить его не нужно
if (block.rows() != 0)
{
block_std_out->write(block);
written_first_block = true;
}
std_out.next();
}
void onTotals(Block & block)
{
block_std_out->setTotals(block);
2012-03-25 03:47:13 +00:00
}
2012-05-08 05:42:05 +00:00
void onExtremes(Block & block)
{
block_std_out->setExtremes(block);
}
2012-05-08 05:42:05 +00:00
2012-05-16 18:03:00 +00:00
void onProgress(const Progress & progress)
2012-05-08 11:19:00 +00:00
{
2012-05-16 18:03:00 +00:00
rows_read_on_server += progress.rows;
bytes_read_on_server += progress.bytes;
2012-05-09 15:50:42 +00:00
writeProgress();
}
void writeProgress()
{
2012-05-09 15:15:45 +00:00
static size_t increment = 0;
static const char * indicators[8] =
{
2012-05-09 16:34:41 +00:00
"\033[1;30m→\033[0m",
"\033[1;31m↘\033[0m",
"\033[1;32m↓\033[0m",
"\033[1;33m↙\033[0m",
"\033[1;34m←\033[0m",
"\033[1;35m↖\033[0m",
"\033[1;36m↑\033[0m",
"\033[1;37m↗\033[0m",
2012-05-09 15:15:45 +00:00
};
if (is_interactive)
{
2012-05-10 07:47:13 +00:00
std::cerr << std::string(written_progress_chars, '\b');
2012-05-09 16:34:41 +00:00
std::stringstream message;
2012-05-10 07:47:13 +00:00
message << indicators[increment % 8]
<< std::fixed << std::setprecision(3)
<< " Progress: " << rows_read_on_server << " rows, " << bytes_read_on_server / 1000000.0 << " MB";
size_t elapsed_ns = watch.elapsed();
if (elapsed_ns)
message << " ("
<< rows_read_on_server * 1000000000.0 / elapsed_ns << " rows/s., "
2012-05-10 07:47:13 +00:00
<< bytes_read_on_server * 1000.0 / elapsed_ns << " MB/s.) ";
else
message << ". ";
2012-05-09 16:34:41 +00:00
written_progress_chars = message.str().size() - 13;
std::cerr << message.rdbuf();
2012-05-09 15:15:45 +00:00
++increment;
}
}
2012-05-16 18:03:00 +00:00
void writeFinalProgress()
{
std::cout << "Processed " << rows_read_on_server << " rows, " << bytes_read_on_server / 1000000.0 << " MB";
size_t elapsed_ns = watch.elapsed();
if (elapsed_ns)
std::cout << " ("
<< rows_read_on_server * 1000000000.0 / elapsed_ns << " rows/s., "
<< bytes_read_on_server * 1000.0 / elapsed_ns << " MB/s.) ";
else
std::cout << ". ";
}
2012-05-16 18:03:00 +00:00
void onException(const Exception & e)
{
std::cerr << "Received exception from server:" << std::endl
<< "Code: " << e.code() << ". " << e.displayText();
}
2013-05-22 14:57:43 +00:00
void onProfileInfo(const BlockStreamProfileInfo & profile_info)
{
if (profile_info.hasAppliedLimit() && block_std_out)
2013-05-22 14:57:43 +00:00
block_std_out->setRowsBeforeLimit(profile_info.getRowsBeforeLimit());
}
2012-05-16 18:03:00 +00:00
void onEndOfStream()
{
if (block_std_out)
block_std_out->writeSuffix();
std_out.next();
2012-05-16 18:03:00 +00:00
if (is_interactive && !written_first_block)
std::cout << "Ok." << std::endl;
}
2012-03-25 03:47:13 +00:00
public:
void init(int argc, char ** argv)
2012-03-25 03:47:13 +00:00
{
/// Останавливаем внутреннюю обработку командной строки
stopOptionsProcessing();
/// Перечисляем основные опции командной строки относящиеся к функциональности клиента
boost::program_options::options_description main_description("Main options");
main_description.add_options()
("config-file,c", boost::program_options::value<std::string> (), "config-file")
("host,h", boost::program_options::value<std::string> ()->default_value("localhost"), "host")
("port,p", boost::program_options::value<int> ()->default_value(9000), "port")
("user,u", boost::program_options::value<int> (), "user")
("password,p", boost::program_options::value<int> (), "password")
("query,q", boost::program_options::value<std::string> (), "query")
("database,d", boost::program_options::value<std::string> (), "database")
("multiline,m", "multiline")
;
/// Перечисляем опции командной строки относящиеся к внешним таблицам
boost::program_options::options_description external_description("Main options");
external_description.add_options()
("file", boost::program_options::value<std::string> (), "data file or - for stdin")
("name", boost::program_options::value<std::string> ()->default_value("_data"), "name of the table")
("format", boost::program_options::value<std::string> ()->default_value("TabSeparated"), "data format")
("structure", boost::program_options::value<std::vector<std::string>> ()->multitoken(), "structure")
("types", boost::program_options::value<std::vector<std::string>> ()->multitoken(), "types")
;
std::vector<int> positions;
positions.push_back(0);
for (int i = 1; i < argc; ++i)
if (strcmp(argv[i], "--external") == 0)
positions.push_back(i);
positions.push_back(argc);
/// Парсим основные опции командной строки
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(positions[1] - positions[0], argv, main_description), options);
for (size_t i = 1; i + 1 < positions.size(); ++i)
{
boost::program_options::variables_map external_options;
boost::program_options::store(boost::program_options::parse_command_line(
positions[i+1] - positions[i], &argv[positions[i]], external_description), external_options);
try
{
external_tables.push_back(ExternalTable(external_options));
}
catch (const Exception & e)
{
std::string text = e.displayText();
std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl;
std::cerr << "Table #" << i << std::endl;
exit(e.code());
}
}
/// Сохраняем полученные данные во внутренний конфиг
if (options.count("config-file"))
config().setString("config-file", options["config-file"].as<std::string>());
if (options.count("host"))
config().setString("host", options["host"].as<std::string>());
if (options.count("query"))
config().setString("query", options["query"].as<std::string>());
if (options.count("database"))
config().setString("database", options["database"].as<std::string>());
if (options.count("port"))
config().setInt("port", options["port"].as<int>());
if (options.count("user"))
config().setInt("user", options["user"].as<int>());
if (options.count("password"))
config().setInt("password", options["password"].as<int>());
if (options.count("multiline"))
config().setBool("multiline", true);
2012-03-25 03:47:13 +00:00
}
};
}
int main(int argc, char ** argv)
{
2012-03-26 02:48:08 +00:00
DB::Client client;
client.init(argc, argv);
return client.run();
2012-03-25 03:47:13 +00:00
}