Merge remote-tracking branch 'upstream/master'

This commit is contained in:
BayoNet 2018-03-26 13:58:09 +03:00
commit 168aed56e9
3 changed files with 19 additions and 5 deletions

View File

@ -110,6 +110,7 @@ private:
winsize terminal_size {}; /// Terminal size is needed to render progress bar.
std::unique_ptr<Connection> connection; /// Connection to DB.
String query_id; /// Current query_id.
String query; /// Current query.
String format; /// Query results output format.
@ -421,6 +422,8 @@ private:
if (is_interactive)
{
if (!query_id.empty())
throw Exception("query_id could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
if (print_time_to_stderr)
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
@ -455,6 +458,7 @@ private:
}
else
{
query_id = config().getString("query_id", "");
nonInteractive();
if (last_exception)
@ -816,7 +820,7 @@ private:
/// Process the query that doesn't require transfering data blocks to the server.
void processOrdinaryQuery()
{
connection->sendQuery(query, "", QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
connection->sendQuery(query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
receiveResult();
}
@ -834,7 +838,7 @@ private:
if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof())))
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
connection->sendQuery(query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
/// Receive description of table structure.
@ -1370,6 +1374,7 @@ public:
("ssl,s", "ssl")
("user,u", boost::program_options::value<std::string>(), "user")
("password", boost::program_options::value<std::string>(), "password")
("query_id", boost::program_options::value<std::string>(), "query_id")
("query,q", boost::program_options::value<std::string>(), "query")
("database,d", boost::program_options::value<std::string>(), "database")
("pager", boost::program_options::value<std::string>(), "pager")
@ -1458,6 +1463,8 @@ public:
config().setString("config-file", options["config-file"].as<std::string>());
if (options.count("host") && !options["host"].defaulted())
config().setString("host", options["host"].as<std::string>());
if (options.count("query_id"))
config().setString("query_id", options["query_id"].as<std::string>());
if (options.count("query"))
config().setString("query", options["query"].as<std::string>());
if (options.count("database"))

View File

@ -199,7 +199,7 @@
<timeout>0.1</timeout>
<interval>60</interval>
<root_path>one_min</root_path>
<hostname_in_path>true<hostname_in_path>
<hostname_in_path>true</hostname_in_path>
<metrics>true</metrics>
<events>true</events>

View File

@ -4,6 +4,8 @@
#include <thread>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
@ -574,9 +576,14 @@ void registerStorageKafka(StorageFactory & factory)
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
// Parse topic list and consumer group
// Parse topic list
Names topics;
topics.push_back(static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>());
String topic_arg = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
boost::split(topics, topic_arg , [](char c){ return c == ','; });
for(String & topic : topics)
boost::trim(topic);
// Parse consumer group
String group = static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>();
// Parse format from string