mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merging [#CLICKHOUSE-3114]
This commit is contained in:
parent
f87761df90
commit
b2fcf06353
19
dbms/src/Common/PocoSessionPoolHelpers.cpp
Normal file
19
dbms/src/Common/PocoSessionPoolHelpers.cpp
Normal file
@ -0,0 +1,19 @@
|
||||
#include <mutex>
|
||||
#include <Poco/ThreadPool.h>
|
||||
#include <Common/PocoSessionPoolHelpers.h>
|
||||
|
||||
|
||||
std::shared_ptr<Poco::Data::SessionPool> createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr)
|
||||
{
|
||||
static std::mutex mutex;
|
||||
|
||||
Poco::ThreadPool & pool = Poco::ThreadPool::defaultPool();
|
||||
|
||||
/// NOTE: The lock don't guarantee that external users of the pool don't change its capacity
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
if (pool.available() == 0)
|
||||
pool.addCapacity(2 * std::max(pool.capacity(), 1));
|
||||
|
||||
return pool_constr();
|
||||
}
|
17
dbms/src/Common/PocoSessionPoolHelpers.h
Normal file
17
dbms/src/Common/PocoSessionPoolHelpers.h
Normal file
@ -0,0 +1,17 @@
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
||||
#include <Poco/Data/SessionPool.h>
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
|
||||
using PocoSessionPoolConstructor = std::function<std::shared_ptr<Poco::Data::SessionPool>()>;
|
||||
|
||||
/** Is used to adjust max size of default Poco thread pool. See issue #750
|
||||
* Acquire the lock, resize pool and construct new Session.
|
||||
*/
|
||||
static std::shared_ptr<Poco::Data::SessionPool> createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr);
|
@ -1,11 +1,6 @@
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
||||
#include <Poco/Data/SessionPool.h>
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
#include <Common/PocoSessionPoolHelpers.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Dictionaries/ODBCDictionarySource.h>
|
||||
#include <Dictionaries/ODBCBlockInputStream.h>
|
||||
@ -33,9 +28,11 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru
|
||||
load_all_query{query_builder.composeLoadAllQuery()},
|
||||
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
|
||||
{
|
||||
pool = createAndCheckResizePocoSessionPool([&] () { return std::make_shared<Poco::Data::SessionPool>(
|
||||
config.getString(config_prefix + ".connector", "ODBC"),
|
||||
config.getString(config_prefix + ".connection_string"));
|
||||
pool = createAndCheckResizePocoSessionPool([&]
|
||||
{
|
||||
return std::make_shared<Poco::Data::SessionPool>(
|
||||
config.getString(config_prefix + ".connector", "ODBC"),
|
||||
config.getString(config_prefix + ".connection_string"));
|
||||
});
|
||||
}
|
||||
|
||||
@ -54,21 +51,6 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
|
||||
{
|
||||
}
|
||||
|
||||
std::shared_ptr<Poco::Data::SessionPool> ODBCDictionarySource::createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr)
|
||||
{
|
||||
static std::mutex mutex;
|
||||
|
||||
Poco::ThreadPool & pool = Poco::ThreadPool::defaultPool();
|
||||
|
||||
/// NOTE: The lock don't guarantee that external users of the pool don't change its capacity
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
|
||||
if (pool.available() == 0)
|
||||
pool.addCapacity(2 * std::max(pool.capacity(), 1));
|
||||
|
||||
return pool_constr();
|
||||
}
|
||||
|
||||
BlockInputStreamPtr ODBCDictionarySource::loadAll()
|
||||
{
|
||||
LOG_TRACE(log, load_all_query);
|
||||
|
@ -67,12 +67,6 @@ private:
|
||||
const std::string load_all_query;
|
||||
std::string invalidate_query;
|
||||
mutable std::string invalidate_query_response;
|
||||
|
||||
using PocoSessionPoolConstructor = std::function<std::shared_ptr<Poco::Data::SessionPool>()>;
|
||||
|
||||
/// Is used to adjust max size of default Poco thread pool. See issue #750
|
||||
/// Acquire the lock, resize pool and construct new Session
|
||||
static std::shared_ptr<Poco::Data::SessionPool> createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr);
|
||||
};
|
||||
|
||||
|
||||
|
@ -141,11 +141,6 @@ public:
|
||||
/// Create Set-s that we can from IN section to use the index on them.
|
||||
void makeSetsForIndex();
|
||||
|
||||
NamesAndTypesList & getUsedColumns()
|
||||
{
|
||||
return columns;
|
||||
}
|
||||
|
||||
private:
|
||||
ASTPtr ast;
|
||||
ASTSelectQuery * select_query;
|
||||
|
@ -24,7 +24,6 @@
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageLog.h>
|
||||
|
@ -21,7 +21,6 @@
|
||||
#include <Parsers/ASTWeightedZooKeeperPath.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Interpreters/InterpreterAlterQuery.h>
|
||||
|
@ -1,207 +1,42 @@
|
||||
#include <Storages/StorageMySQL.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
|
||||
#include <Storages/transformQueryForExternalDatabase.h>
|
||||
#include <Dictionaries/MySQLBlockInputStream.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
|
||||
static bool isCompatible(const DB::IAST & where)
|
||||
{
|
||||
const DB::ASTFunction * where_expression = typeid_cast<const DB::ASTFunction *>(&where);
|
||||
if (where_expression)
|
||||
{
|
||||
DB::String name = where_expression->name;
|
||||
if ((name == "and") || (name == "or"))
|
||||
{
|
||||
for (const auto & expr : where_expression->arguments->children)
|
||||
{
|
||||
if (!isCompatible(*expr.get()))
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else if (name == "equals" || (name == "notEquals") || (name == "greater") || (name == "less") || (name == "lessOrEquals")
|
||||
|| (name == "greaterOrEquals"))
|
||||
{
|
||||
const auto & children = where_expression->arguments->children;
|
||||
if ((children.size() != 2) || !isCompatible(*children[0].get()) || !isCompatible(*children[1].get()))
|
||||
return false;
|
||||
}
|
||||
else if (name == "not")
|
||||
{
|
||||
const auto & children = where_expression->arguments->children;
|
||||
if ((children.size() != 1) || !isCompatible(*children[0].get()))
|
||||
return false;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
const DB::ASTLiteral * literal = typeid_cast<const DB::ASTLiteral *>(&where);
|
||||
if (literal)
|
||||
return true;
|
||||
const DB::ASTIdentifier * identifier = typeid_cast<const DB::ASTIdentifier *>(&where);
|
||||
if (identifier)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
/** Implements storage in the MySQL database.
|
||||
* Use ENGINE = mysql(host_port, database_name, table_name, user_name, password)
|
||||
* Read only
|
||||
*/
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_FIND_FIELD;
|
||||
};
|
||||
|
||||
|
||||
StorageMySQL::StorageMySQL(const std::string & table_name_,
|
||||
const std::string & server_,
|
||||
int port_,
|
||||
const std::string & database_name_,
|
||||
const std::string & mysql_table_name_,
|
||||
const std::string & user_name_,
|
||||
const std::string & password_,
|
||||
const NamesAndTypesList & columns_,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
const NamesAndTypesList & alias_columns_,
|
||||
const ColumnDefaults & column_defaults_,
|
||||
const Context & context_)
|
||||
: IStorage(materialized_columns_, alias_columns_, column_defaults_)
|
||||
, table_name(table_name_)
|
||||
, server(server_)
|
||||
, port(port_)
|
||||
, database_name(database_name_)
|
||||
, mysql_table_name(mysql_table_name_)
|
||||
, user_name(user_name_)
|
||||
, password(password_)
|
||||
StorageMySQL::StorageMySQL(
|
||||
const std::string & name,
|
||||
const std::string & host,
|
||||
UInt16 port,
|
||||
const std::string & remote_database_name,
|
||||
const std::string & remote_table_name,
|
||||
const std::string & user,
|
||||
const std::string & password,
|
||||
const NamesAndTypesList & columns)
|
||||
: name(name)
|
||||
, host(host), port(port)
|
||||
, remote_database_name(remote_database_name)
|
||||
, remote_table_name(remote_table_name)
|
||||
, user(user), password(password)
|
||||
, columns(columns_)
|
||||
, context_global(context_)
|
||||
, pool(database_name_, server_, user_name_, password_, port_)
|
||||
, pool(remote_database_name, host, user, password, port)
|
||||
{
|
||||
column_map.set_empty_key("");
|
||||
for (const auto & it : columns)
|
||||
{
|
||||
column_map[it.name] = it.type;
|
||||
}
|
||||
}
|
||||
|
||||
void dumpWhere(const IAST & where, std::stringstream & stream)
|
||||
{
|
||||
IAST::FormatSettings s(stream, false, true);
|
||||
where.format(s);
|
||||
}
|
||||
|
||||
/** Function puts to stream compatible expressions of where statement.
|
||||
* Compatible expressions are logical expressions on logical expressions or comparisons between fields or constants
|
||||
*/
|
||||
|
||||
void filterWhere(const IAST & where, std::stringstream & stream)
|
||||
{
|
||||
String name = where.getID();
|
||||
if (name == "Function_and")
|
||||
{
|
||||
const ASTFunction * and_expression = typeid_cast<const ASTFunction *>(&where);
|
||||
bool first = true;
|
||||
for (const auto & expr : and_expression->arguments->children)
|
||||
{
|
||||
if (isCompatible(*expr.get()))
|
||||
{
|
||||
if (!first)
|
||||
stream << " AND ";
|
||||
first = false;
|
||||
dumpWhere(*expr.get(), stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (isCompatible(where))
|
||||
{
|
||||
dumpWhere(where, stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Function transformQueryForODBC builds select query of all used columns in query_info from table set by table_name parameter with where expression from filtered query_info.
|
||||
* Also it is builds sample_block with all columns, where types are found in column_map
|
||||
*/
|
||||
|
||||
std::string transformQueryForODBC(const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
std::string table_name,
|
||||
NamesAndTypesListPtr columns,
|
||||
google::dense_hash_map<std::string, DataTypePtr> & column_map,
|
||||
Block & sample_block)
|
||||
{
|
||||
BlockInputStreams res;
|
||||
StoragePtr storage(nullptr);
|
||||
ExpressionAnalyzer analyzer(query_info.query, context, storage, *columns);
|
||||
NamesAndTypesList & usedColumns = analyzer.getUsedColumns();
|
||||
std::stringstream iss;
|
||||
iss << "SELECT ";
|
||||
bool first = true;
|
||||
for (const auto & column : usedColumns)
|
||||
{
|
||||
if (!first)
|
||||
iss << ",";
|
||||
iss << column.name;
|
||||
first = false;
|
||||
ColumnWithTypeAndName col;
|
||||
col.name = column.name;
|
||||
google::dense_hash_map<std::string, DataTypePtr>::iterator it = column_map.find(column.name);
|
||||
if (it == column_map.end())
|
||||
{
|
||||
throw Exception("Can not find field " + column.name + " in table " + table_name, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||
}
|
||||
col.type = it->second;
|
||||
col.column = column.type->createColumn();
|
||||
sample_block.insert(std::move(col));
|
||||
}
|
||||
iss << " FROM " << table_name;
|
||||
const ASTSelectQuery * select_query = typeid_cast<ASTSelectQuery *>(query_info.query.get());
|
||||
const IAST * where = select_query->where_expression.get();
|
||||
if (where)
|
||||
{
|
||||
std::stringstream where_stream;
|
||||
// fprintf(stderr, "WHERE treeId: %s\n", where->getTreeID().c_str());
|
||||
filterWhere(*where, where_stream);
|
||||
std::string filtered_where = where_stream.str();
|
||||
if (filtered_where.size())
|
||||
{
|
||||
iss << " WHERE " << filtered_where;
|
||||
}
|
||||
// fprintf(stderr, "Filtered WHERE: %s\n", filtered_where.c_str());
|
||||
}
|
||||
return iss.str();
|
||||
}
|
||||
|
||||
BlockInputStreams StorageMySQL::read(const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
unsigned)
|
||||
{
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
BlockInputStreams res;
|
||||
sample_block.clear();
|
||||
std::string query = transformQueryForODBC(query_info, context, mysql_table_name, columns, column_map, sample_block);
|
||||
// fprintf(stderr, "Query: %s\n", query.c_str());
|
||||
res.push_back(std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size));
|
||||
return res;
|
||||
String query = transformQueryForExternalDatabase(*query_info.query, columns, remote_database_name, remote_table_name, context);
|
||||
return { std::make_shared<MySQLBlockInputStream>(pool.Get(), query, getSampleBlock(), max_block_size) };
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,83 +1,35 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
#include <Poco/Net/SocketAddress.h>
|
||||
|
||||
#include <mysqlxx/Pool.h>
|
||||
|
||||
#include <sparsehash/dense_hash_map>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
inline std::string splitHostPort(const char * host_port, int & port)
|
||||
{
|
||||
Poco::Net::SocketAddress socket_address(host_port);
|
||||
port = socket_address.port();
|
||||
return socket_address.host().toString();
|
||||
}
|
||||
|
||||
/** Implements storage in the MySQL database.
|
||||
* Use ENGINE = mysql(host_port, database_name, table_name, user_name, password)
|
||||
* Read only.
|
||||
*/
|
||||
class StorageMySQL : public IStorage
|
||||
{
|
||||
public:
|
||||
StorageMySQL(const std::string & table_name_,
|
||||
const std::string & server_,
|
||||
int port_,
|
||||
const std::string & database_name_,
|
||||
const std::string & mysql_table_name_,
|
||||
const std::string & user_name_,
|
||||
const std::string & password_,
|
||||
const NamesAndTypesListPtr & columns_,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
const NamesAndTypesList & alias_columns_,
|
||||
const ColumnDefaults & column_defaults_,
|
||||
const Context & context_);
|
||||
|
||||
static StoragePtr create(const std::string & table_name,
|
||||
const std::string & host_port,
|
||||
const std::string & database_name,
|
||||
const std::string & mysql_table_name,
|
||||
const std::string & user_name,
|
||||
StorageMySQL(
|
||||
const std::string & name,
|
||||
const std::string & host,
|
||||
UInt16 port,
|
||||
const std::string & remote_database_name,
|
||||
const std::string & remote_table_name,
|
||||
const std::string & user,
|
||||
const std::string & password,
|
||||
const NamesAndTypesListPtr & columns,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
const NamesAndTypesList & alias_columns_,
|
||||
const ColumnDefaults & column_defaults_,
|
||||
const Context & context)
|
||||
{
|
||||
std::string server;
|
||||
int port;
|
||||
server = splitHostPort(host_port.c_str(), port);
|
||||
return std::make_shared<StorageMySQL>(table_name,
|
||||
server,
|
||||
port,
|
||||
database_name,
|
||||
mysql_table_name,
|
||||
user_name,
|
||||
password,
|
||||
columns,
|
||||
materialized_columns_,
|
||||
alias_columns_,
|
||||
column_defaults_,
|
||||
context);
|
||||
}
|
||||
const NamesAndTypesList & columns);
|
||||
|
||||
std::string getName() const override
|
||||
{
|
||||
return "MySQL";
|
||||
}
|
||||
std::string getName() const override { return "MySQL"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
||||
|
||||
std::string getTableName() const override
|
||||
{
|
||||
return table_name;
|
||||
}
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override
|
||||
{
|
||||
return *columns;
|
||||
}
|
||||
|
||||
BlockInputStreams read(const Names & column_names,
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
@ -85,17 +37,17 @@ public:
|
||||
unsigned num_streams) override;
|
||||
|
||||
private:
|
||||
std::string table_name;
|
||||
std::string server;
|
||||
int port;
|
||||
std::string database_name;
|
||||
std::string mysql_table_name;
|
||||
std::string user_name;
|
||||
std::string name;
|
||||
|
||||
std::string host;
|
||||
UInt16 port;
|
||||
std::string remote_database_name;
|
||||
std::string remote_table_name;
|
||||
std::string user;
|
||||
std::string password;
|
||||
Block sample_block;
|
||||
NamesAndTypesListPtr columns;
|
||||
const Context & context_global;
|
||||
google::dense_hash_map<std::string, DataTypePtr> column_map;
|
||||
|
||||
NamesAndTypesList columns;
|
||||
mysqlxx::Pool pool;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,66 +1,42 @@
|
||||
#include <Common/PocoSessionPoolHelpers.h>
|
||||
#include <Storages/transformQueryForExternalDatabase.h>
|
||||
#include <Storages/StorageODBC.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
|
||||
#include <Dictionaries/ODBCBlockInputStream.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
/** Implements storage in the ODBC database.
|
||||
* Use ENGINE = odbc(odbc connection string, table name)
|
||||
* Example ENGINE = odbc('dsn=test', table)
|
||||
* Read only
|
||||
*/
|
||||
|
||||
std::string transformQueryForODBC(const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
std::string table_name,
|
||||
const NamesAndTypesList & columns,
|
||||
google::dense_hash_map<std::string, DataTypePtr> & column_map,
|
||||
Block & sample_block);
|
||||
|
||||
StorageODBC::StorageODBC(const std::string & table_name_,
|
||||
const std::string & database_name_,
|
||||
const std::string & odbc_table_name_,
|
||||
const NamesAndTypesList & columns_,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
const NamesAndTypesList & alias_columns_,
|
||||
const ColumnDefaults & column_defaults_,
|
||||
const Context & context_)
|
||||
: IStorage(materialized_columns_, alias_columns_, column_defaults_)
|
||||
, table_name(table_name_)
|
||||
, database_name(database_name_)
|
||||
, odbc_table_name(odbc_table_name_)
|
||||
StorageODBC::StorageODBC(
|
||||
const std::string & name,
|
||||
const std::string & connection_string,
|
||||
const std::string & remote_database_name,
|
||||
const std::string & remote_table_name,
|
||||
const NamesAndTypesList & columns)
|
||||
: name(name)
|
||||
, connection_string(connection_string)
|
||||
, remote_database_name(remote_database_name)
|
||||
, remote_table_name(remote_table_name)
|
||||
, columns(columns_)
|
||||
, context_global(context_)
|
||||
, pool("ODBC", database_name_)
|
||||
{
|
||||
column_map.set_empty_key("");
|
||||
for (auto & it : *columns)
|
||||
pool = createAndCheckResizePocoSessionPool([&]
|
||||
{
|
||||
column_map[it.name] = it.type;
|
||||
}
|
||||
return std::make_shared<Poco::Data::SessionPool>("ODBC", connection_string);
|
||||
});
|
||||
}
|
||||
|
||||
BlockInputStreams StorageODBC::read(const Names & column_names,
|
||||
BlockInputStreams StorageODBC::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
check(column_names);
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
DB::BlockInputStreams res;
|
||||
sample_block.clear();
|
||||
std::string query = transformQueryForODBC(query_info, context, odbc_table_name, columns, column_map, sample_block);
|
||||
res.push_back(std::make_shared<ODBCBlockInputStream>(pool.get(), query, sample_block, max_block_size));
|
||||
return res;
|
||||
String query = transformQueryForExternalDatabase(*query_info.query, columns, remote_database_name, remote_table_name, context);
|
||||
return { std::make_shared<ODBCBlockInputStream>(pool->get(), query, getSampleBlock(), max_block_size) };
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,53 +2,40 @@
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
#include <Poco/Data/SessionPool.h>
|
||||
|
||||
#include <sparsehash/dense_hash_map>
|
||||
namespace Poco
|
||||
{
|
||||
namespace Data
|
||||
{
|
||||
class SessionPool;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Implements storage in the ODBC database.
|
||||
* Use ENGINE = odbc(connection_string, table_name)
|
||||
* Example ENGINE = odbc('dsn=test', table)
|
||||
* Read only.
|
||||
*/
|
||||
class StorageODBC : public IStorage
|
||||
{
|
||||
public:
|
||||
StorageODBC(const std::string & table_name_,
|
||||
const std::string & database_name_,
|
||||
const std::string & odbc_table_name_,
|
||||
const NamesAndTypesListPtr & columns_,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
const NamesAndTypesList & alias_columns_,
|
||||
const ColumnDefaults & column_defaults_,
|
||||
const Context & context_);
|
||||
StorageODBC(
|
||||
const std::string & name,
|
||||
const std::string & connection_string,
|
||||
const std::string & remote_database_name,
|
||||
const std::string & remote_table_name,
|
||||
const NamesAndTypesList & columns);
|
||||
|
||||
static StoragePtr create(const std::string & table_name,
|
||||
const std::string & database_name,
|
||||
const std::string & odbc_table_name,
|
||||
const NamesAndTypesListPtr & columns,
|
||||
const NamesAndTypesList & materialized_columns_,
|
||||
const NamesAndTypesList & alias_columns_,
|
||||
const ColumnDefaults & column_defaults_,
|
||||
const Context & context)
|
||||
{
|
||||
return std::make_shared<StorageODBC>(
|
||||
table_name, database_name, odbc_table_name, columns, materialized_columns_, alias_columns_, column_defaults_, context);
|
||||
}
|
||||
std::string getName() const override { return "ODBC"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
||||
|
||||
std::string getName() const override
|
||||
{
|
||||
return "ODBC";
|
||||
}
|
||||
|
||||
std::string getTableName() const override
|
||||
{
|
||||
return table_name;
|
||||
}
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override
|
||||
{
|
||||
return *columns;
|
||||
}
|
||||
|
||||
BlockInputStreams read(const Names & column_names,
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
@ -56,13 +43,14 @@ public:
|
||||
unsigned num_streams) override;
|
||||
|
||||
private:
|
||||
std::string table_name;
|
||||
std::string database_name;
|
||||
std::string odbc_table_name;
|
||||
Block sample_block;
|
||||
NamesAndTypesListPtr columns;
|
||||
const Context & context_global;
|
||||
google::dense_hash_map<std::string, DataTypePtr> column_map;
|
||||
Poco::Data::SessionPool pool;
|
||||
std::string name;
|
||||
|
||||
std::string connection_string;
|
||||
std::string remote_database_name;
|
||||
std::string remote_table_name;
|
||||
|
||||
NamesAndTypesList columns;
|
||||
|
||||
std::shared_ptr<Poco::Data::SessionPool> pool;
|
||||
};
|
||||
}
|
||||
|
@ -17,7 +17,6 @@
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTOptimizeQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
|
110
dbms/src/Storages/transformQueryForExternalDatabase.cpp
Normal file
110
dbms/src/Storages/transformQueryForExternalDatabase.cpp
Normal file
@ -0,0 +1,110 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Parsers/IAST.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Storages/transformQueryForExternalDatabase.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static bool isCompatible(const IAST & node)
|
||||
{
|
||||
if (const ASTFunction * function = typeid_cast<const ASTFunction *>(&node))
|
||||
{
|
||||
String name = function->name;
|
||||
|
||||
if (!(name == "and"
|
||||
|| name == "or"
|
||||
|| name == "not"
|
||||
|| name == "equals"
|
||||
|| name == "notEquals"
|
||||
|| name == "greater"
|
||||
|| name == "less"
|
||||
|| name == "lessOrEquals"
|
||||
|| name == "greaterOrEquals"))
|
||||
return false;
|
||||
|
||||
for (const auto & expr : function->arguments->children)
|
||||
if (!isCompatible(*expr.get()))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (const ASTLiteral * literal = typeid_cast<const ASTLiteral *>(&node))
|
||||
{
|
||||
if (literal->value.type == Field::Type::Array
|
||||
|| literal->value.type == Field::Type::Tuple)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (typeid_cast<const ASTIdentifier *>(&node))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
String transformQueryForExternalDatabase(
|
||||
const IAST & query,
|
||||
const NamesAndTypesList & available_columns,
|
||||
const String & database,
|
||||
const String & table,
|
||||
const Context & context)
|
||||
{
|
||||
ExpressionAnalyzer analyzer(query.clone(), context, {}, available_columns);
|
||||
const Names & used_columns = analyzer.getRequiredColumns();
|
||||
|
||||
auto select = std::make_shared<ASTSelectQuery>();
|
||||
|
||||
select->replaceDatabaseAndTable(database, table);
|
||||
|
||||
auto select_expr_list = std::make_shared<ASTExpressionList>();
|
||||
for (const auto & name : used_columns)
|
||||
select_expr_list->children.push_back(std::make_shared<ASTIdentifier>(StringRange(), name));
|
||||
|
||||
select->select_expression_list = std::move(select_expr_list);
|
||||
|
||||
/** If there was WHERE,
|
||||
* copy it to transformed query if it is compatible,
|
||||
* or if it is AND expression,
|
||||
* copy only compatible parts of it.
|
||||
*/
|
||||
|
||||
const ASTPtr & original_where = typeid_cast<const ASTSelectQuery &>(query).where_expression;
|
||||
if (original_where)
|
||||
{
|
||||
if (isCompatible(*original_where))
|
||||
{
|
||||
select->where_expression = original_where;
|
||||
}
|
||||
else if (const ASTFunction * function = typeid_cast<const ASTFunction *>(original_where.get()))
|
||||
{
|
||||
if (function->name == "and")
|
||||
{
|
||||
auto new_function_and = std::make_shared<ASTFunction>();
|
||||
auto new_function_and_arguments = std::make_shared<ASTExpressionList>();
|
||||
new_function_and->arguments = new_function_and_arguments;
|
||||
new_function_and->children.push_back(new_function_and_arguments);
|
||||
|
||||
for (const auto & elem : function->arguments->children)
|
||||
if (isCompatible(*elem))
|
||||
new_function_and_arguments->children.push_back(elem);
|
||||
|
||||
select->where_expression = std::move(new_function_and);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queryToString(select);
|
||||
}
|
||||
|
||||
}
|
33
dbms/src/Storages/transformQueryForExternalDatabase.h
Normal file
33
dbms/src/Storages/transformQueryForExternalDatabase.h
Normal file
@ -0,0 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Types.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IAST;
|
||||
class Context;
|
||||
|
||||
/** For given ClickHouse query,
|
||||
* creates another query in a form of
|
||||
*
|
||||
* SELECT columns... FROM db.table WHERE ...
|
||||
*
|
||||
* where 'columns' are all required columns to read from "left" table of original query,
|
||||
* and WHERE contains subset of (AND-ed) conditions from original query,
|
||||
* that contain only compatible expressions.
|
||||
*
|
||||
* Compatible expressions are comparisons of identifiers, constants, and logical operations on them.
|
||||
*
|
||||
* NOTE There are concerns with proper quoting of identifiers for remote database.
|
||||
* Some databases use `quotes` and other use "quotes".
|
||||
*/
|
||||
String transformQueryForExternalDatabase(
|
||||
const IAST & query,
|
||||
const NamesAndTypesList & available_columns,
|
||||
const String & database,
|
||||
const String & table,
|
||||
const Context & context);
|
||||
|
||||
}
|
@ -92,29 +92,24 @@ StoragePtr TableFunctionMySQL::execute(const ASTPtr & ast_function, const Contex
|
||||
insertColumn(sample_block, "Extra");
|
||||
std::string table_name = static_cast<const ASTLiteral &>(*args[2]).value.safeGet<String>();
|
||||
MySQLBlockInputStream result(pool.Get(), std::string("DESCRIBE ") + table_name, sample_block, 1 << 16);
|
||||
Block resultBlock = result.read();
|
||||
const IColumn & names = *resultBlock.getByPosition(0).column.get();
|
||||
const IColumn & types = *resultBlock.getByPosition(1).column.get();
|
||||
Block result_block = result.read();
|
||||
const IColumn & names = *result_block.getByPosition(0).column.get();
|
||||
const IColumn & types = *result_block.getByPosition(1).column.get();
|
||||
size_t field_count = names.size();
|
||||
NamesAndTypesListPtr columns = std::make_shared<NamesAndTypesList>();
|
||||
NamesAndTypesList materialized_columns;
|
||||
NamesAndTypesList alias_columns;
|
||||
ColumnDefaults column_defaults;
|
||||
NamesAndTypesList columns;
|
||||
|
||||
for (size_t i = 0; i < field_count; ++i)
|
||||
{
|
||||
columns->push_back(NameAndTypePair(names.getDataAt(i).data, getDataType(types.getDataAt(i).data)));
|
||||
}
|
||||
auto res = StorageMySQL::create(table_name,
|
||||
|
||||
auto res = StorageMySQL::create(
|
||||
table_name,
|
||||
host_port,
|
||||
database_name,
|
||||
table_name,
|
||||
user_name,
|
||||
password,
|
||||
columns,
|
||||
materialized_columns,
|
||||
alias_columns,
|
||||
column_defaults,
|
||||
context);
|
||||
columns);
|
||||
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/* mysql ('host:port', database, table, user, password) - creates a temporary StorageMySQL.
|
||||
* The structure of the table is taken from the mysql query DESCRIBE table.
|
||||
* If there is no such table, an exception is thrown.
|
||||
@ -19,4 +20,5 @@ public:
|
||||
}
|
||||
StoragePtr execute(const ASTPtr & ast_function, const Context & context) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user