mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
[WIP]
This commit is contained in:
parent
cb6fcee307
commit
13b88886d4
@ -1,6 +1,7 @@
|
||||
set(CLICKHOUSE_CLIENT_SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/ConnectionParameters.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/Suggest.cpp
|
||||
)
|
||||
|
||||
set(CLICKHOUSE_CLIENT_LINK PRIVATE clickhouse_common_config clickhouse_functions clickhouse_aggregate_functions clickhouse_common_io clickhouse_parsers string_utils ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "TestHint.h"
|
||||
#include "ConnectionParameters.h"
|
||||
#include "Suggest.h"
|
||||
|
||||
#include <port/unistd.h>
|
||||
#include <stdlib.h>
|
||||
@ -405,6 +406,10 @@ private:
|
||||
if (print_time_to_stderr)
|
||||
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false))
|
||||
/// Load suggestion data from the server.
|
||||
Suggest::instance()->load(connection_parameters, config().getInt("suggestion_limit"));
|
||||
|
||||
/// Load command history if present.
|
||||
if (config().has("history_file"))
|
||||
history_file = config().getString("history_file");
|
||||
@ -420,7 +425,7 @@ private:
|
||||
if (!history_file.empty() && !Poco::File(history_file).exists())
|
||||
Poco::File(history_file).createFile();
|
||||
|
||||
LineReader lr(history_file, '\\', config().has("multiline") ? ';' : 0);
|
||||
LineReader lr(Suggest::instance(), history_file, '\\', config().has("multiline") ? ';' : 0);
|
||||
|
||||
do
|
||||
{
|
||||
@ -466,7 +471,12 @@ private:
|
||||
/// This is intended for testing purposes.
|
||||
if (config().getBool("always_load_suggestion_data", false))
|
||||
{
|
||||
#ifdef USE_REPLXX
|
||||
SCOPE_EXIT({ Suggest::instance().finalize(); });
|
||||
Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit"));
|
||||
#else
|
||||
throw Exception("Command line suggestions cannot work without readline", ErrorCodes::BAD_ARGUMENTS);
|
||||
#endif
|
||||
}
|
||||
|
||||
query_id = config().getString("query_id", "");
|
||||
|
142
dbms/programs/client/Suggest.cpp
Normal file
142
dbms/programs/client/Suggest.cpp
Normal file
@ -0,0 +1,142 @@
|
||||
#include "Suggest.h"
|
||||
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void Suggest::load(const ConnectionParameters & connection_parameters, size_t suggestion_limit)
|
||||
{
|
||||
loading_thread = std::thread([connection_parameters, suggestion_limit, this]
|
||||
{
|
||||
try
|
||||
{
|
||||
Connection connection(
|
||||
connection_parameters.host,
|
||||
connection_parameters.port,
|
||||
connection_parameters.default_database,
|
||||
connection_parameters.user,
|
||||
connection_parameters.password,
|
||||
"client",
|
||||
connection_parameters.compression,
|
||||
connection_parameters.security);
|
||||
|
||||
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
}
|
||||
|
||||
/// Note that keyword suggestions are available even if we cannot load data from server.
|
||||
|
||||
std::sort(words.begin(), words.end());
|
||||
ready = true;
|
||||
});
|
||||
}
|
||||
|
||||
Suggest::Suggest()
|
||||
{
|
||||
/// Keywords may be not up to date with ClickHouse parser.
|
||||
words = {"CREATE", "DATABASE", "IF", "NOT", "EXISTS", "TEMPORARY", "TABLE", "ON", "CLUSTER", "DEFAULT",
|
||||
"MATERIALIZED", "ALIAS", "ENGINE", "AS", "VIEW", "POPULATE", "SETTINGS", "ATTACH", "DETACH", "DROP",
|
||||
"RENAME", "TO", "ALTER", "ADD", "MODIFY", "CLEAR", "COLUMN", "AFTER", "COPY", "PROJECT",
|
||||
"PRIMARY", "KEY", "CHECK", "PARTITION", "PART", "FREEZE", "FETCH", "FROM", "SHOW", "INTO",
|
||||
"OUTFILE", "FORMAT", "TABLES", "DATABASES", "LIKE", "PROCESSLIST", "CASE", "WHEN", "THEN", "ELSE",
|
||||
"END", "DESCRIBE", "DESC", "USE", "SET", "OPTIMIZE", "FINAL", "DEDUPLICATE", "INSERT", "VALUES",
|
||||
"SELECT", "DISTINCT", "SAMPLE", "ARRAY", "JOIN", "GLOBAL", "LOCAL", "ANY", "ALL", "INNER",
|
||||
"LEFT", "RIGHT", "FULL", "OUTER", "CROSS", "USING", "PREWHERE", "WHERE", "GROUP", "BY",
|
||||
"WITH", "TOTALS", "HAVING", "ORDER", "COLLATE", "LIMIT", "UNION", "AND", "OR", "ASC",
|
||||
"IN", "KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE"};
|
||||
}
|
||||
|
||||
void Suggest::loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit)
|
||||
{
|
||||
std::stringstream query;
|
||||
query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM ("
|
||||
"SELECT name FROM system.functions"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.table_engines"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.formats"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.table_functions"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.data_type_families"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.settings"
|
||||
" UNION ALL "
|
||||
"SELECT concat(func.name, comb.name) FROM system.functions AS func CROSS JOIN system.aggregate_function_combinators AS comb WHERE is_aggregate";
|
||||
|
||||
/// The user may disable loading of databases, tables, columns by setting suggestion_limit to zero.
|
||||
if (suggestion_limit > 0)
|
||||
{
|
||||
String limit_str = toString(suggestion_limit);
|
||||
query <<
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.databases LIMIT " << limit_str
|
||||
<< " UNION ALL "
|
||||
"SELECT DISTINCT name FROM system.tables LIMIT " << limit_str
|
||||
<< " UNION ALL "
|
||||
"SELECT DISTINCT name FROM system.columns LIMIT " << limit_str;
|
||||
}
|
||||
|
||||
query << ") WHERE notEmpty(res)";
|
||||
|
||||
fetch(connection, timeouts, query.str());
|
||||
}
|
||||
|
||||
void Suggest::fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
|
||||
{
|
||||
connection.sendQuery(timeouts, query);
|
||||
|
||||
while (true)
|
||||
{
|
||||
Packet packet = connection.receivePacket();
|
||||
switch (packet.type)
|
||||
{
|
||||
case Protocol::Server::Data:
|
||||
fillWordsFromBlock(packet.block);
|
||||
continue;
|
||||
|
||||
case Protocol::Server::Progress:
|
||||
continue;
|
||||
case Protocol::Server::ProfileInfo:
|
||||
continue;
|
||||
case Protocol::Server::Totals:
|
||||
continue;
|
||||
case Protocol::Server::Extremes:
|
||||
continue;
|
||||
case Protocol::Server::Log:
|
||||
continue;
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
packet.exception->rethrow();
|
||||
return;
|
||||
|
||||
case Protocol::Server::EndOfStream:
|
||||
return;
|
||||
|
||||
default:
|
||||
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Suggest::fillWordsFromBlock(const Block & block)
|
||||
{
|
||||
if (!block)
|
||||
return;
|
||||
|
||||
if (block.columns() != 1)
|
||||
throw Exception("Wrong number of columns received for query to read words for suggestion", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
const ColumnString & column = typeid_cast<const ColumnString &>(*block.getByPosition(0).column);
|
||||
|
||||
size_t rows = block.rows();
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
words.emplace_back(column.getDataAt(i).toString());
|
||||
}
|
||||
|
||||
}
|
@ -2,18 +2,9 @@
|
||||
|
||||
#include "ConnectionParameters.h"
|
||||
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
#include <string.h>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
|
||||
#include <common/readline_use.h>
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Client/Connection.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <common/LineReader.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -24,206 +15,34 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_PACKET_FROM_SERVER;
|
||||
}
|
||||
|
||||
class Suggest : private boost::noncopyable
|
||||
class Suggest : public LineReader::Suggest, boost::noncopyable
|
||||
{
|
||||
private:
|
||||
/// The vector will be filled with completion words from the server and sorted.
|
||||
using Words = std::vector<std::string>;
|
||||
|
||||
/// Keywords may be not up to date with ClickHouse parser.
|
||||
Words words
|
||||
{
|
||||
"CREATE", "DATABASE", "IF", "NOT", "EXISTS", "TEMPORARY", "TABLE", "ON", "CLUSTER", "DEFAULT", "MATERIALIZED", "ALIAS", "ENGINE",
|
||||
"AS", "VIEW", "POPULATE", "SETTINGS", "ATTACH", "DETACH", "DROP", "RENAME", "TO", "ALTER", "ADD", "MODIFY", "CLEAR", "COLUMN", "AFTER",
|
||||
"COPY", "PROJECT", "PRIMARY", "KEY", "CHECK", "PARTITION", "PART", "FREEZE", "FETCH", "FROM", "SHOW", "INTO", "OUTFILE", "FORMAT", "TABLES",
|
||||
"DATABASES", "LIKE", "PROCESSLIST", "CASE", "WHEN", "THEN", "ELSE", "END", "DESCRIBE", "DESC", "USE", "SET", "OPTIMIZE", "FINAL", "DEDUPLICATE",
|
||||
"INSERT", "VALUES", "SELECT", "DISTINCT", "SAMPLE", "ARRAY", "JOIN", "GLOBAL", "LOCAL", "ANY", "ALL", "INNER", "LEFT", "RIGHT", "FULL", "OUTER",
|
||||
"CROSS", "USING", "PREWHERE", "WHERE", "GROUP", "BY", "WITH", "TOTALS", "HAVING", "ORDER", "COLLATE", "LIMIT", "UNION", "AND", "OR", "ASC", "IN",
|
||||
"KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE"
|
||||
};
|
||||
|
||||
/// Words are fetched asynchronously.
|
||||
std::thread loading_thread;
|
||||
std::atomic<bool> ready{false};
|
||||
|
||||
/// Points to current word to suggest.
|
||||
Words::const_iterator pos;
|
||||
/// Points after the last possible match.
|
||||
Words::const_iterator end;
|
||||
|
||||
/// Set iterators to the matched range of words if any.
|
||||
void findRange(const char * prefix, size_t prefix_length)
|
||||
{
|
||||
std::string prefix_str(prefix);
|
||||
std::tie(pos, end) = std::equal_range(words.begin(), words.end(), prefix_str,
|
||||
[prefix_length](const std::string & s, const std::string & prefix_searched) { return strncmp(s.c_str(), prefix_searched.c_str(), prefix_length) < 0; });
|
||||
}
|
||||
|
||||
/// Iterates through matched range.
|
||||
char * nextMatch()
|
||||
{
|
||||
if (pos >= end)
|
||||
return nullptr;
|
||||
|
||||
/// readline will free memory by itself.
|
||||
char * word = strdup(pos->c_str());
|
||||
++pos;
|
||||
return word;
|
||||
}
|
||||
|
||||
void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit)
|
||||
{
|
||||
std::stringstream query;
|
||||
query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM ("
|
||||
"SELECT name FROM system.functions"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.table_engines"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.formats"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.table_functions"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.data_type_families"
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.settings"
|
||||
" UNION ALL "
|
||||
"SELECT concat(func.name, comb.name) FROM system.functions AS func CROSS JOIN system.aggregate_function_combinators AS comb WHERE is_aggregate";
|
||||
|
||||
/// The user may disable loading of databases, tables, columns by setting suggestion_limit to zero.
|
||||
if (suggestion_limit > 0)
|
||||
{
|
||||
String limit_str = toString(suggestion_limit);
|
||||
query <<
|
||||
" UNION ALL "
|
||||
"SELECT name FROM system.databases LIMIT " << limit_str
|
||||
<< " UNION ALL "
|
||||
"SELECT DISTINCT name FROM system.tables LIMIT " << limit_str
|
||||
<< " UNION ALL "
|
||||
"SELECT DISTINCT name FROM system.columns LIMIT " << limit_str;
|
||||
}
|
||||
|
||||
query << ") WHERE notEmpty(res)";
|
||||
|
||||
fetch(connection, timeouts, query.str());
|
||||
}
|
||||
|
||||
void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
|
||||
{
|
||||
connection.sendQuery(timeouts, query);
|
||||
|
||||
while (true)
|
||||
{
|
||||
Packet packet = connection.receivePacket();
|
||||
switch (packet.type)
|
||||
{
|
||||
case Protocol::Server::Data:
|
||||
fillWordsFromBlock(packet.block);
|
||||
continue;
|
||||
|
||||
case Protocol::Server::Progress:
|
||||
continue;
|
||||
case Protocol::Server::ProfileInfo:
|
||||
continue;
|
||||
case Protocol::Server::Totals:
|
||||
continue;
|
||||
case Protocol::Server::Extremes:
|
||||
continue;
|
||||
case Protocol::Server::Log:
|
||||
continue;
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
packet.exception->rethrow();
|
||||
return;
|
||||
|
||||
case Protocol::Server::EndOfStream:
|
||||
return;
|
||||
|
||||
default:
|
||||
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fillWordsFromBlock(const Block & block)
|
||||
{
|
||||
if (!block)
|
||||
return;
|
||||
|
||||
if (block.columns() != 1)
|
||||
throw Exception("Wrong number of columns received for query to read words for suggestion", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
const ColumnString & column = typeid_cast<const ColumnString &>(*block.getByPosition(0).column);
|
||||
|
||||
size_t rows = block.rows();
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
words.emplace_back(column.getDataAt(i).toString());
|
||||
}
|
||||
|
||||
public:
|
||||
static Suggest & instance()
|
||||
static Suggest * instance()
|
||||
{
|
||||
static Suggest instance;
|
||||
return instance;
|
||||
return &instance;
|
||||
}
|
||||
|
||||
/// More old server versions cannot execute the query above.
|
||||
void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit);
|
||||
|
||||
/// Older server versions cannot execute the query above.
|
||||
static constexpr int MIN_SERVER_REVISION = 54406;
|
||||
|
||||
void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit)
|
||||
{
|
||||
loading_thread = std::thread([connection_parameters, suggestion_limit, this]
|
||||
{
|
||||
try
|
||||
{
|
||||
Connection connection(
|
||||
connection_parameters.host,
|
||||
connection_parameters.port,
|
||||
connection_parameters.default_database,
|
||||
connection_parameters.user,
|
||||
connection_parameters.password,
|
||||
"client",
|
||||
connection_parameters.compression,
|
||||
connection_parameters.security);
|
||||
|
||||
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
}
|
||||
|
||||
/// Note that keyword suggestions are available even if we cannot load data from server.
|
||||
|
||||
std::sort(words.begin(), words.end());
|
||||
ready = true;
|
||||
});
|
||||
}
|
||||
|
||||
void finalize()
|
||||
private:
|
||||
Suggest();
|
||||
~Suggest()
|
||||
{
|
||||
if (loading_thread.joinable())
|
||||
loading_thread.join();
|
||||
}
|
||||
|
||||
/// A function for readline.
|
||||
static char * generator(const char * text, int state)
|
||||
{
|
||||
Suggest & suggest = Suggest::instance();
|
||||
if (!suggest.ready)
|
||||
return nullptr;
|
||||
if (state == 0)
|
||||
suggest.findRange(text, strlen(text));
|
||||
void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit);
|
||||
void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query);
|
||||
void fillWordsFromBlock(const Block & block);
|
||||
|
||||
/// Do not append whitespace after word. For unknown reason, rl_completion_append_character = '\0' does not work.
|
||||
rl_completion_suppress_append = 1;
|
||||
|
||||
return suggest.nextMatch();
|
||||
}
|
||||
|
||||
~Suggest()
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
/// Words are fetched asynchronously.
|
||||
std::thread loading_thread;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
#include <common/Types.h>
|
||||
|
||||
#include <vector>
|
||||
|
||||
#ifdef USE_REPLXX
|
||||
# include <replxx.hxx>
|
||||
#endif
|
||||
@ -9,7 +11,21 @@
|
||||
class LineReader
|
||||
{
|
||||
public:
|
||||
LineReader(const String & history_file_path, char extender, char delimiter = 0); /// if delimiter != 0, then it's multiline mode
|
||||
class Suggest
|
||||
{
|
||||
protected:
|
||||
using Words = std::vector<std::string>;
|
||||
using WordsRange = std::pair<Words::const_iterator, Words::const_iterator>;
|
||||
|
||||
Words words;
|
||||
std::atomic<bool> ready{false};
|
||||
|
||||
public:
|
||||
/// Get iterators for the matched range of words if any.
|
||||
WordsRange getCompletions(const String & prefix, size_t prefix_length) const;
|
||||
};
|
||||
|
||||
LineReader(const Suggest * suggest, const String & history_file_path, char extender, char delimiter = 0); /// if delimiter != 0, then it's multiline mode
|
||||
~LineReader();
|
||||
|
||||
/// Reads the whole line until delimiter (in multiline mode) or until the last line without extender.
|
||||
|
@ -13,12 +13,37 @@ void trim(String & s)
|
||||
|
||||
}
|
||||
|
||||
LineReader::LineReader(const String & history_file_path_, char extender_, char delimiter_)
|
||||
LineReader::Suggest::WordsRange LineReader::Suggest::getCompletions(const String & prefix, size_t prefix_length) const
|
||||
{
|
||||
if (!ready)
|
||||
return std::make_pair(words.end(), words.end());
|
||||
|
||||
return std::equal_range(
|
||||
words.begin(), words.end(), prefix, [prefix_length](const std::string & s, const std::string & prefix_searched)
|
||||
{
|
||||
return strncmp(s.c_str(), prefix_searched.c_str(), prefix_length) < 0;
|
||||
});
|
||||
}
|
||||
|
||||
LineReader::LineReader(const Suggest * suggest, const String & history_file_path_, char extender_, char delimiter_)
|
||||
: history_file_path(history_file_path_), extender(extender_), delimiter(delimiter_)
|
||||
{
|
||||
#ifdef USE_REPLXX
|
||||
if (!history_file_path.empty())
|
||||
rx.history_load(history_file_path);
|
||||
|
||||
auto callback = [suggest] (const String & context, size_t context_size)
|
||||
{
|
||||
auto range = suggest->getCompletions(context, context_size);
|
||||
return replxx::Replxx::completions_t(range.first, range.second);
|
||||
};
|
||||
|
||||
if (suggest)
|
||||
{
|
||||
rx.set_completion_callback(callback);
|
||||
rx.set_complete_on_empty(false);
|
||||
rx.set_word_break_characters(" \t\n\r\"\\'`@$><=;|&{(.");
|
||||
}
|
||||
#endif
|
||||
/// FIXME: check extender != delimiter
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ int main(int argc, char ** argv)
|
||||
Logger::root().setLevel("trace");
|
||||
|
||||
zkutil::ZooKeeper zk(argv[1]);
|
||||
LineReader lr({}, '\\');
|
||||
LineReader lr(nullptr, {}, '\\');
|
||||
|
||||
do
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user