Merge pull request #3389 from yandex/CLICKHOUSE-4067

CLICKHOUSE-4067: Fixes in odbc dictionaries. Now dictionaries receive…
This commit is contained in:
alesapin 2018-10-16 15:37:23 +03:00 committed by GitHub
commit 457f8fd495
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 190 additions and 34 deletions

View File

@ -254,7 +254,8 @@ struct ODBCBridgeMixin
static void startBridge(const Poco::Util::AbstractConfiguration & config, Poco::Logger * log, const Poco::Timespan & http_timeout)
{
Poco::Path path{config.getString("application.dir", "")};
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
path.setFileName(
#if CLICKHOUSE_SPLIT_BINARY
@ -264,9 +265,6 @@ struct ODBCBridgeMixin
#endif
);
if (!Poco::File(path).exists())
throw Exception("clickhouse binary (" + path.toString() + ") is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() <<

View File

@ -155,7 +155,8 @@ DictionarySourcePtr DictionarySourceFactory::create(
else if ("odbc" == source_type)
{
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
const auto & global_config = context.getConfigRef();
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(global_config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context, bridge);
#else
throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",

View File

@ -14,17 +14,28 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
}
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style)
: dict_struct(dict_struct), db(db), table(table), where(where), quoting_style(quoting_style)
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_)
: dict_struct(dict_struct_), db(db_), where(where_), quoting_style(quoting_style_)
{
if (auto pos = table_.find('.'); pos != std::string::npos)
{
schema = table_.substr(0, pos);
table = table_.substr(pos + 1);
}
else
{
schema = "";
table = table_;
}
}
@ -124,6 +135,11 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
if (!where.empty())
@ -187,6 +203,12 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
@ -250,6 +272,12 @@ std::string ExternalQueryBuilder::composeLoadKeysQuery(
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);

View File

@ -18,19 +18,20 @@ class WriteBuffer;
struct ExternalQueryBuilder
{
const DictionaryStructure & dict_struct;
const std::string & db;
const std::string & table;
std::string db;
std::string table;
std::string schema;
const std::string & where;
IdentifierQuotingStyle quoting_style;
ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style);
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_);
/** Generate a query to load all data. */
std::string composeLoadAllQuery() const;

View File

@ -14,9 +14,9 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
(You must close all your sessions (for example, restart your computer))

View File

@ -13,6 +13,7 @@ import pymysql
import xml.dom.minidom
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
import psycopg2
import docker
from docker.errors import ContainerError
@ -79,6 +80,7 @@ class ClickHouseCluster:
self.instances = {}
self.with_zookeeper = False
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
self.with_odbc_drivers = False
@ -92,7 +94,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
@ -127,6 +129,12 @@ class ClickHouseCluster:
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if with_postgres and not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_odbc_drivers and not self.with_odbc_drivers:
self.with_odbc_drivers = True
if not self.with_mysql:
@ -134,6 +142,12 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_kafka and not self.with_kafka:
self.with_kafka = True
@ -168,6 +182,21 @@ class ClickHouseCluster:
raise Exception("Cannot wait MySQL container")
def wait_postgres_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.close()
print "Postgres Started"
return
except Exception as ex:
print "Can't connect to Postgres " + str(ex)
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")
def wait_zookeeper_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
@ -204,20 +233,24 @@ class ClickHouseCluster:
self.docker_client = docker.from_env(version=self.docker_api_version)
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate'])
for command in self.pre_zookeeper_commands:
self.run_kazoo_commands_with_retries(command, repeats=5)
self.wait_zookeeper_to_start(120)
if self.with_mysql and self.base_mysql_cmd:
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate'])
self.wait_mysql_to_start(120)
if self.with_postgres and self.base_postgres_cmd:
subprocess_check_call(self.base_postgres_cmd + ['up', '-d', '--force-recreate'])
self.wait_postgres_to_start(120)
if self.with_kafka and self.base_kafka_cmd:
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate'])
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate'])
start_deadline = time.time() + 20.0 # seconds
for instance in self.instances.itervalues():
@ -444,8 +477,18 @@ class ClickHouseInstance:
},
"PostgreSQL": {
"DSN": "postgresql_odbc",
"Database": "postgres",
"UserName": "postgres",
"Password": "mysecretpassword",
"Port": "5432",
"Servername": "postgres1",
"Protocol": "9.3",
"ReadOnly": "No",
"RowVersioning": "No",
"ShowSystemTables": "No",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
"ConnSettings": "",
}
}
else:

View File

@ -0,0 +1,9 @@
version: '2'
services:
postgres1:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: mysecretpassword
ports:
- 5432:5432

View File

@ -0,0 +1,38 @@
<dictionaries>
<dictionary>
<name>postgres_odbc_hashed</name>
<source>
<odbc>
<table>clickhouse.test_table</table>
<connection_string>DSN=postgresql_odbc;</connection_string>
<db>postgres</db>
</odbc>
</source>
<lifetime>
<min>5</min>
<max>5</max>
</lifetime>
<layout>
<hashed />
</layout>
<structure>
<id>
<name>column1</name>
</id>
<attribute>
<name>column1</name>
<type>Int64</type>
<null_value>1</null_value>
</attribute>
<attribute>
<name>column2</name>
<type>String</type>
<null_value>''</null_value>
</attribute>
</structure>
</dictionary>
</dictionaries>

View File

@ -3,12 +3,14 @@ import pytest
import os
import pymysql.cursors
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml'])
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml', 'configs/dictionaries/postgres_odbc_hashed_dictionary.xml'])
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
@ -31,24 +33,49 @@ def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(table_name))
def get_postgres_conn():
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE SCHEMA {}".format(name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
sqlite_db = node1.odbc_drivers["SQLite3"]["Database"]
print "sqlite data received"
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t1(x INTEGER PRIMARY KEY ASC, y, z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t2(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t3(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t4(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
conn = get_mysql_conn()
print "sqlite tables created"
mysql_conn = get_mysql_conn()
print "mysql connection received"
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
create_mysql_db(mysql_conn, 'clickhouse')
print "mysql database created"
postgres_conn = get_postgres_conn()
print "postgres connection received"
create_postgres_db(postgres_conn, 'clickhouse')
print "postgres db created"
cursor = postgres_conn.cursor()
cursor.execute("create table if not exists clickhouse.test_table (column1 int primary key, column2 varchar(40) not null)")
yield cluster
except Exception as ex:
print(ex)
raise ex
finally:
cluster.shutdown()
@ -141,3 +168,11 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "12\n"
def test_postgres_odbc_hached_dictionary_with_schema(started_cluster):
conn = get_postgres_conn()
cursor = conn.cursor()
cursor.execute("insert into clickhouse.test_table values(1, 'hello'),(2, 'world')")
time.sleep(5)
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))") == "hello\n"
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))") == "world\n"

View File

@ -111,7 +111,7 @@ Example of settings:
```xml
<odbc>
<db>DatabaseName</db>
<table>TableName</table>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
@ -120,10 +120,11 @@ Example of settings:
Setting fields:
- `db` Name of the database. Omit it if the database name is set in the `<connection_string>` parameters.
- `table` Name of the table.
- `table` Name of the table and schema if exists.
- `connection_string` Connection string.
- `invalidate_query` Query for checking the dictionary status. Optional parameter. Read more in the section [Updating dictionaries](external_dicts_dict_lifetime.md#dicts-external_dicts_dict_lifetime).
ClickHouse receives quoting symbols from ODBC-driver and quote all settings in queries to driver, so it's necessary to set table name accordingly to table name case in database.
### Known vulnerability of the ODBC dictionary functionality

View File

@ -111,7 +111,7 @@
```xml
<odbc>
<db>DatabaseName</db>
<table>TableName</table>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
@ -119,11 +119,13 @@
Поля настройки:
- `db` - имя базы данных. Не указывать, если имя базы задано в параметрах `<connection_string>`.
- `table` - имя таблицы.
- `db` - имя базы данных. Не указывать, если имя базы задано в параметрах. `<connection_string>`.
- `table` - имя таблицы и схемы, если она есть.
- `connection_string` - строка соединения.
- `invalidate_query` - запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md#dicts-external_dicts_dict_lifetime).
ClickHouse получает от ODBC-драйвера информацию о квотировании и квотирует настройки в запросах к драйверу, поэтому имя таблицы нужно указывать в соответствии с регистром имени таблицы в базе данных.
### Выявленная уязвимость в функционировании ODBC словарей
!!! attention