Add postgres dictionary source

This commit is contained in:
kssenii 2020-12-10 00:14:16 +03:00
parent 9cb2c75464
commit 375e8e9736
11 changed files with 466 additions and 1 deletions

View File

@ -41,6 +41,9 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
for (const auto idx : ext::range(0, description.sample_block.columns())) for (const auto idx : ext::range(0, description.sample_block.columns()))
if (description.types[idx].first == ValueType::vtArray) if (description.types[idx].first == ValueType::vtArray)
prepareArrayInfo(idx, description.sample_block.getByPosition(idx).type); prepareArrayInfo(idx, description.sample_block.getByPosition(idx).type);
/// pqxx::stream_from uses COPY command, but when selecting from dictionary will get ';', it is not needed
if (query_str.ends_with(';'))
query_str.resize(query_str.size() - 1);
} }

View File

@ -41,7 +41,7 @@ private:
} }
void prepareArrayInfo(size_t column_idx, const DataTypePtr data_type); void prepareArrayInfo(size_t column_idx, const DataTypePtr data_type);
const String query_str; String query_str;
const UInt64 max_block_size; const UInt64 max_block_size;
ExternalResultDescription description; ExternalResultDescription description;

View File

@ -0,0 +1,177 @@
#include "PostgreSQLDictionarySource.h"
#if USE_LIBPQXX
# include <Columns/ColumnString.h>
# include <DataTypes/DataTypeString.h>
# include <IO/WriteBufferFromString.h>
# include <IO/WriteHelpers.h>
# include <common/LocalDateTime.h>
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include "readInvalidateQuery.h"
#include "DictionarySourceFactory.h"
namespace DB
{
static const UInt64 max_block_size = 8192;
PostgreSQLDictionarySource::PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
const std::string & connection_str,
const Block & sample_block_)
: dict_struct{dict_struct_}
, sample_block(sample_block_)
, connection(std::make_shared<pqxx::connection>(connection_str))
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(config_.getString(fmt::format("{}.db", config_prefix), ""))
, table(config_.getString(fmt::format("{}.table", config_prefix), ""))
, where(config_.getString(fmt::format("{}.where", config_prefix), ""))
, query_builder(dict_struct, "", "", table, where, IdentifierQuotingStyle::DoubleQuotes)
, load_all_query(query_builder.composeLoadAllQuery())
, invalidate_query(config_.getString(fmt::format("{}.invalidate_query", config_prefix), ""))
{
}
/// copy-constructor is provided in order to support cloneability
PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other)
: dict_struct(other.dict_struct)
, sample_block(other.sample_block)
, connection(other.connection)
, log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(other.db)
, table(other.table)
, where(other.where)
, query_builder(dict_struct, "", "", table, where, IdentifierQuotingStyle::DoubleQuotes)
, load_all_query(query_builder.composeLoadAllQuery())
, invalidate_query(other.invalidate_query)
, update_time(other.update_time)
, invalidate_query_response(other.invalidate_query_response)
{
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return std::make_shared<PostgreSQLBlockInputStream>(
connection, load_all_query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll()
{
auto load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<PostgreSQLBlockInputStream>(connection, load_update_query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<PostgreSQLBlockInputStream>(connection, query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<PostgreSQLBlockInputStream>(connection, query, sample_block, max_block_size);
}
bool PostgreSQLDictionarySource::isModified() const
{
if (!invalidate_query.empty())
{
auto response = doInvalidateQuery(invalidate_query);
if (response == invalidate_query_response)
return false;
invalidate_query_response = response;
}
return true;
}
std::string PostgreSQLDictionarySource::doInvalidateQuery(const std::string & request) const
{
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
PostgreSQLBlockInputStream block_input_stream(connection, request, invalidate_sample_block, 1);
return readInvalidateQuery(block_input_stream);
}
bool PostgreSQLDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
std::string PostgreSQLDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
std::string str_time = std::to_string(LocalDateTime(hr_time));
return query_builder.composeUpdateQuery(update_field, str_time);
}
else
{
update_time = std::chrono::system_clock::now();
return query_builder.composeLoadAllQuery();
}
}
bool PostgreSQLDictionarySource::supportsSelectiveLoad() const
{
return true;
}
DictionarySourcePtr PostgreSQLDictionarySource::clone() const
{
return std::make_unique<PostgreSQLDictionarySource>(*this);
}
std::string PostgreSQLDictionarySource::toString() const
{
return "PostgreSQL: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
}
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & root_config_prefix,
Block & sample_block,
const Context & /* context */,
const std::string & /* default_database */,
bool /* check_config */) -> DictionarySourcePtr
{
const auto config_prefix = root_config_prefix + ".postgresql";
auto connection_str = fmt::format("dbname={} host={} port={} user={} password={}",
config.getString(fmt::format("{}.db", config_prefix), ""),
config.getString(fmt::format("{}.host", config_prefix), ""),
config.getUInt(fmt::format("{}.port", config_prefix), 0),
config.getString(fmt::format("{}.user", config_prefix), ""),
config.getString(fmt::format("{}.password", config_prefix), ""));
return std::make_unique<PostgreSQLDictionarySource>(
dict_struct, config, config_prefix, connection_str, sample_block);
};
factory.registerSource("postgresql", create_table_source);
}
}
#endif

View File

@ -0,0 +1,71 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
# include <common/LocalDateTime.h>
# include "DictionaryStructure.h"
# include "ExternalQueryBuilder.h"
# include "IDictionarySource.h"
#include <Core/Block.h>
#include <common/logger_useful.h>
#include <pqxx/pqxx>
namespace DB
{
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
/// Allows loading dictionaries from a PostgreSQL database
class PostgreSQLDictionarySource final : public IDictionarySource
{
public:
PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix,
const std::string & connection_str,
const Block & sample_block_);
/// copy-constructor is provided in order to support cloneability
PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other);
PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
bool supportsSelectiveLoad() const override;
bool hasUpdateField() const override;
DictionarySourcePtr clone() const override;
std::string toString() const override;
private:
std::string getUpdateFieldAndDate();
std::string doInvalidateQuery(const std::string & request) const;
const DictionaryStructure dict_struct;
Block sample_block;
ConnectionPtr connection;
Poco::Logger * log;
const std::string db;
const std::string table;
const std::string where;
ExternalQueryBuilder query_builder;
const std::string load_all_query;
std::string invalidate_query;
std::chrono::time_point<std::chrono::system_clock> update_time;
const std::string update_field;
mutable std::string invalidate_query_response;
};
}
#endif

View File

@ -16,6 +16,7 @@ void registerDictionaries()
registerDictionarySourceCassandra(source_factory); registerDictionarySourceCassandra(source_factory);
registerDictionarySourceXDBC(source_factory); registerDictionarySourceXDBC(source_factory);
registerDictionarySourceJDBC(source_factory); registerDictionarySourceJDBC(source_factory);
registerDictionarySourcePostgreSQL(source_factory);
registerDictionarySourceExecutable(source_factory); registerDictionarySourceExecutable(source_factory);
registerDictionarySourceHTTP(source_factory); registerDictionarySourceHTTP(source_factory);
registerDictionarySourceLibrary(source_factory); registerDictionarySourceLibrary(source_factory);

View File

@ -13,6 +13,7 @@ void registerDictionarySourceCassandra(DictionarySourceFactory & source_factory)
void registerDictionarySourceRedis(DictionarySourceFactory & source_factory); void registerDictionarySourceRedis(DictionarySourceFactory & source_factory);
void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory); void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory); void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourcePostgreSQL(DictionarySourceFactory & source_factory);
void registerDictionarySourceExecutable(DictionarySourceFactory & source_factory); void registerDictionarySourceExecutable(DictionarySourceFactory & source_factory);
void registerDictionarySourceHTTP(DictionarySourceFactory & source_factory); void registerDictionarySourceHTTP(DictionarySourceFactory & source_factory);
void registerDictionarySourceLibrary(DictionarySourceFactory & source_factory); void registerDictionarySourceLibrary(DictionarySourceFactory & source_factory);

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<yandex>
<dictionaries_config>/etc/clickhouse-server/config.d/postgres_dict.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="utf-8"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,73 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict0</name>
<source>
<postgresql>
<db>clickhouse</db>
<host>postgres1</host>
<port>5432</port>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test0</table>
</postgresql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
</id>
<attribute>
<name>id</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>1</lifetime>
</dictionary>
<dictionary>
<name>dict1</name>
<source>
<postgresql>
<db>clickhouse</db>
<host>postgres1</host>
<port>5432</port>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test1</table>
<invalidate_query>SELECT value FROM test1 WHERE id = 0</invalidate_query>
</postgresql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
</id>
<attribute>
<name>id</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>
<min>1</min>
<max>1</max>
</lifetime>
</dictionary>
</yandex>

View File

@ -0,0 +1,123 @@
import pytest
import time
import psycopg2
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config.xml', 'configs/postgres_dict.xml', 'configs/log_conf.xml'], with_postgres=True)
postgres_dict_table_template = """
CREATE TABLE IF NOT EXISTS {} (
id Integer NOT NULL, value Integer NOT NULL, PRIMARY KEY (id))
"""
click_dict_table_template = """
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (
`id` UInt64, `value` UInt32
) ENGINE = Dictionary({})
"""
def get_postgres_conn(database=False):
if database == True:
conn_string = "host='localhost' dbname='clickhouse' user='postgres' password='mysecretpassword'"
else:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(conn, table_name):
cursor = conn.cursor()
cursor.execute(postgres_dict_table_template.format(table_name))
def create_and_fill_postgres_table(table_name, index=0):
table_name = table_name + str(index)
conn = get_postgres_conn(True)
create_postgres_table(conn, table_name)
# Fill postgres table using clickhouse postgres table function and check
table_func = '''postgresql('postgres1:5432', 'clickhouse', '{}', 'postgres', 'mysecretpassword')'''.format(table_name)
node1.query('''INSERT INTO TABLE FUNCTION {} SELECT number, number from numbers(10000)
'''.format(table_func, table_name))
result = node1.query("SELECT count() FROM {}".format(table_func))
assert result.rstrip() == '10000'
def create_dict(table_name, index=0):
node1.query(click_dict_table_template.format(table_name + str(index), 'dict' + str(index)))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
postgres_conn = get_postgres_conn()
node1.query("CREATE DATABASE IF NOT EXISTS test")
print("postgres connected")
create_postgres_db(postgres_conn, 'clickhouse')
yield cluster
finally:
cluster.shutdown()
def test_load_dictionaries(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
table_name = 'test'
create_and_fill_postgres_table(table_name)
create_dict(table_name)
table_name += str(0)
dict_name = 'dict0'
node1.query("SYSTEM RELOAD DICTIONARIES")
assert node1.query("SELECT count() FROM `test`.`dict_table_{}`".format(table_name)).rstrip() == '10000'
assert node1.query("SELECT dictGetUInt32('{}', 'id', toUInt64(0))".format(dict_name)) == '0\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name)) == '9999\n'
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
def test_invalidate_query(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
table_name = 'test'
create_and_fill_postgres_table(table_name, 0)
create_and_fill_postgres_table(table_name, 1)
# this dict has no invalidate query
dict_name = 'dict0'
create_dict(table_name)
node1.query("SYSTEM RELOAD DICTIONARIES")
first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name))
time.sleep(4)
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name))
assert first_update_time != second_update_time
# this dict has invalidate query: SELECT value FROM test1 WHERE id = 0
dict_name = 'dict1'
create_dict(table_name, 1)
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == "1\n"
first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name))
time.sleep(4)
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(table_name))
assert first_update_time != second_update_time
# no update should be made
cursor.execute("UPDATE {} SET value=value*2 WHERE id > 0".format(table_name+str(1)))
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '1\n'
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id=0".format(table_name+str(1)))
time.sleep(5)
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '2\n'
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()