Fixed issues

This commit is contained in:
Alexander Avdonkin 2017-12-13 19:46:57 +03:00
parent 89db5040df
commit d1ee8c5358
9 changed files with 70 additions and 54 deletions

View File

@ -141,9 +141,9 @@ public:
/// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex();
NamesAndTypesList* getColumns()
NamesAndTypesList & getUsedColumns()
{
return &columns;
return columns;
}
private:

View File

@ -1,3 +1,4 @@
#include <sparsehash/dense_hash_map>
#include <unistd.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/AbstractConfiguration.h>

View File

@ -8,8 +8,50 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/typeid_cast.h>
static bool isCompatible(const DB::IAST & where)
{
const DB::ASTFunction * where_expression = typeid_cast<const DB::ASTFunction *>(&where);
if (where_expression)
{
DB::String name = where_expression->name;
if ((name == "and") || (name == "or"))
{
for (const auto & expr: where_expression->arguments->children)
{
if (!isCompatible(*expr.get()))
return false;
}
}
else if (name == "equals" || (name == "notEquals") || (name == "greater") || (name == "less")
|| (name == "lessOrEquals") || (name == "greaterOrEquals"))
{
const auto & children = where_expression->arguments->children;
if ((children.size() != 2) || !isCompatible(*children[0].get()) || !isCompatible(*children[1].get()))
return false;
}
else if (name == "not")
{
const auto & children = where_expression->arguments->children;
if ((children.size() != 1) || !isCompatible(*children[0].get()))
return false;
}
else
return false;
return true;
}
const DB::ASTLiteral * literal = typeid_cast<const DB::ASTLiteral *>(&where);
if (literal)
return true;
const DB::ASTIdentifier * identifier = typeid_cast<const DB::ASTIdentifier *>(&where);
if (identifier)
return true;
return false;
}
namespace DB
{
@ -47,46 +89,10 @@ StorageMySQL::StorageMySQL(
}
}
bool isCompatible(const IAST & where)
{
String name = where.getID();
if ((name == "Function_and") || (name == "Function_or"))
{
const ASTFunction * and_expression = typeid_cast<const ASTFunction *>(&where);
for (const auto & expr: and_expression->arguments->children)
{
if (!isCompatible(*expr.get()))
return false;
}
}
else if (name == "Function_equals" || (name == "Function_notEquals") || (name == "Function_greater") || (name == "Function_less")
|| (name == "Function_lessOrEquals") || (name == "Function_greaterOrEquals"))
{
const ASTFunction * function = typeid_cast<const ASTFunction *>(&where);
const auto & children = function->arguments->children;
if ((children.size() != 2) || !isCompatible(*children[0].get()) || !isCompatible(*children[1].get()))
return false;
}
else if (name == "Function_not")
{
const ASTFunction * function = typeid_cast<const ASTFunction *>(&where);
const auto & children = function->arguments->children;
if ((children.size() != 1) || !isCompatible(*children[0].get()))
return false;
}
else if ((strncmp(name.c_str(), "Identifier_", 11) == 0) || (strncmp(name.c_str(), "Literal_", 8) == 0))
return true;
else
return false;
return true;
}
void dumpWhere(const IAST & where, std::stringstream & stream)
{
IAST::FormatSettings s(stream, false, true);
IAST::FormatState state;
IAST::FormatStateStacked frame;
where.formatImpl(s, state, frame);
where.format(s);
}
/** Function puts to stream compatible expressions of where statement.
@ -120,20 +126,20 @@ void filterWhere(const IAST & where, std::stringstream & stream)
}
}
/** Function analyze query builds select query of all used columns in query_info from table set by table_name parameter with where expression from filtered query_info.
/** Function transformQueryForODBC builds select query of all used columns in query_info from table set by table_name parameter with where expression from filtered query_info.
* Also it is builds sample_block with all columns, where types are found in column_map
*/
std::string analyzeQuery(const SelectQueryInfo & query_info, const Context & context, std::string table_name, NamesAndTypesListPtr columns, google::dense_hash_map<std::string, DataTypePtr> & column_map, Block & sample_block)
std::string transformQueryForODBC(const SelectQueryInfo & query_info, const Context & context, std::string table_name, NamesAndTypesListPtr columns, google::dense_hash_map<std::string, DataTypePtr> & column_map, Block & sample_block)
{
BlockInputStreams res;
StoragePtr storage(NULL);
StoragePtr storage(nullptr);
ExpressionAnalyzer analyzer(query_info.query, context, storage, *columns);
NamesAndTypesList* usedColumns = analyzer.getColumns();
NamesAndTypesList & usedColumns = analyzer.getUsedColumns();
std::stringstream iss;
iss << "SELECT ";
bool first = true;
for (const auto & column: *usedColumns)
for (const auto & column: usedColumns)
{
if (!first)
iss << ",";
@ -176,9 +182,11 @@ BlockInputStreams StorageMySQL::read(
size_t max_block_size,
unsigned num_streams)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
BlockInputStreams res;
sample_block.clear();
std::string query = analyzeQuery(query_info, context, mysql_table_name, columns, column_map, sample_block);
std::string query = transformQueryForODBC(query_info, context, mysql_table_name, columns, column_map, sample_block);
// fprintf(stderr, "Query: %s\n", query.c_str());
res.push_back(std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size));
return res;

View File

@ -1,12 +1,13 @@
#pragma once
#include <Storages/IStorage.h>
#include <sparsehash/dense_hash_map>
#include <Poco/Net/SocketAddress.h>
#include <mysqlxx/Pool.h>
#include <sparsehash/dense_hash_map>
namespace DB
{

View File

@ -10,6 +10,7 @@
#include <Common/typeid_cast.h>
namespace DB
{
@ -24,7 +25,7 @@ namespace ErrorCodes
extern const int CANNOT_FIND_FIELD;
};
std::string analyzeQuery(const SelectQueryInfo & query_info, const Context & context, std::string table_name, NamesAndTypesListPtr columns, google::dense_hash_map<std::string, DataTypePtr> & column_map, Block & sample_block);
std::string transformQueryForODBC(const SelectQueryInfo & query_info, const Context & context, std::string table_name, NamesAndTypesListPtr columns, google::dense_hash_map<std::string, DataTypePtr> & column_map, Block & sample_block);
StorageODBC::StorageODBC(
const std::string & table_name_,
@ -55,9 +56,11 @@ BlockInputStreams StorageODBC::read(
size_t max_block_size,
unsigned num_streams)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
DB::BlockInputStreams res;
sample_block.clear();
std::string query = analyzeQuery(query_info, context, odbc_table_name, columns, column_map, sample_block);
std::string query = transformQueryForODBC(query_info, context, odbc_table_name, columns, column_map, sample_block);
res.push_back(std::make_shared<ODBCBlockInputStream>(pool.get(), query, sample_block, max_block_size));
return res;
}

View File

@ -1,10 +1,11 @@
#pragma once
#include <Storages/IStorage.h>
#include <sparsehash/dense_hash_map>
#include <Poco/Data/SessionPool.h>
#include <sparsehash/dense_hash_map>
namespace DB
{

View File

@ -6,3 +6,5 @@ list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFuncti
add_library(clickhouse_table_functions ${SPLIT_SHARED} ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions dbms clickhouse_storages_system ${Poco_Foundation_LIBRARY})
target_include_directories (clickhouse_table_functions BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})

View File

@ -65,7 +65,7 @@ StoragePtr TableFunctionMySQL::execute(const ASTPtr & ast_function, const Contex
ASTs & args_func = typeid_cast<ASTFunction &>(*ast_function).children;
if (args_func.size() != 1)
throw Exception("Table function 'numbers' requires exactly one argument: amount of numbers.",
throw Exception("Table function 'mysql' requires exactly one argument: amount of numbers.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
@ -96,12 +96,12 @@ StoragePtr TableFunctionMySQL::execute(const ASTPtr & ast_function, const Contex
Block resultBlock = result.read();
const IColumn & names = *resultBlock.getByPosition(0).column.get();
const IColumn & types = *resultBlock.getByPosition(1).column.get();
int field_count = names.size();
size_t field_count = names.size();
NamesAndTypesListPtr columns = std::make_shared<NamesAndTypesList>();
NamesAndTypesList materialized_columns;
NamesAndTypesList alias_columns;
ColumnDefaults column_defaults;
for (int i = 0; i < field_count; ++i)
for (size_t i = 0; i < field_count; ++i)
{
columns->push_back(NameAndTypePair(names.getDataAt(i).data, getDataType(types.getDataAt(i).data)));
}

View File

@ -50,7 +50,7 @@ StoragePtr TableFunctionODBC::execute(const ASTPtr & ast_function, const Context
ASTs & args_func = typeid_cast<ASTFunction &>(*ast_function).children;
if (args_func.size() != 1)
throw Exception("Table function 'numbers' requires exactly one argument: database name and table name.",
throw Exception("Table function 'odbc' requires exactly one argument: database name and table name.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
@ -99,8 +99,8 @@ StoragePtr TableFunctionODBC::execute(const ASTPtr & ast_function, const Context
columns->push_back(NameAndTypePair((char*)column_name, getDataType(type)));
// fprintf(stderr, "Column name: %s type: %i\n", column_name, type);
}
SQLFreeStmt(hstmt, SQL_DROP);
auto result = StorageODBC::create(table_name, database_name, table_name, columns, materialized_columns, alias_columns, column_defaults, context);
SQLFreeStmt(hstmt, SQL_DROP);
result->startup();
return result;
}