This commit is contained in:
kssenii 2020-12-13 22:48:40 +00:00
parent 375e8e9736
commit d95e8e2d74
5 changed files with 45 additions and 76 deletions

View File

@ -1,21 +1,20 @@
#include "PostgreSQLDictionarySource.h"
#if USE_LIBPQXX
#include <Poco/Util/AbstractConfiguration.h>
#include "DictionarySourceFactory.h"
#include "registerDictionaries.h"
# include <Columns/ColumnString.h>
# include <DataTypes/DataTypeString.h>
# include <IO/WriteBufferFromString.h>
# include <IO/WriteHelpers.h>
# include <common/LocalDateTime.h>
#if USE_LIBPQXX
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include "readInvalidateQuery.h"
#include "DictionarySourceFactory.h"
namespace DB
{
static const UInt64 max_block_size = 8192;
PostgreSQLDictionarySource::PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
@ -32,6 +31,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
, query_builder(dict_struct, "", "", table, where, IdentifierQuotingStyle::DoubleQuotes)
, load_all_query(query_builder.composeLoadAllQuery())
, invalidate_query(config_.getString(fmt::format("{}.invalidate_query", config_prefix), ""))
, update_field(config_.getString(fmt::format("{}.update_field", config_prefix), ""))
{
}
@ -49,6 +49,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar
, load_all_query(query_builder.composeLoadAllQuery())
, invalidate_query(other.invalidate_query)
, update_time(other.update_time)
, update_field(other.update_field)
, invalidate_query_response(other.invalidate_query_response)
{
}
@ -69,7 +70,6 @@ BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll()
return std::make_shared<PostgreSQLBlockInputStream>(connection, load_update_query, sample_block, max_block_size);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
@ -159,6 +159,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
const std::string & /* default_database */,
bool /* check_config */) -> DictionarySourcePtr
{
#if USE_LIBPQXX
const auto config_prefix = root_config_prefix + ".postgresql";
auto connection_str = fmt::format("dbname={} host={} port={} user={} password={}",
config.getString(fmt::format("{}.db", config_prefix), ""),
@ -169,9 +170,17 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
return std::make_unique<PostgreSQLDictionarySource>(
dict_struct, config, config_prefix, connection_str, sample_block);
#else
(void)dict_struct;
(void)config;
(void)root_config_prefix;
(void)sample_block;
throw Exception{"Dictionary source of type `postgresql` is disabled because ClickHouse was built without postgresql support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("postgresql", create_table_source);
}
}
#endif

View File

@ -3,16 +3,14 @@
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
#if USE_LIBPQXX
# include <common/LocalDateTime.h>
# include "DictionaryStructure.h"
# include "ExternalQueryBuilder.h"
# include "IDictionarySource.h"
#include "ExternalQueryBuilder.h"
#include <Core/Block.h>
#include <common/LocalDateTime.h>
#include <common/logger_useful.h>
#include <pqxx/pqxx>
namespace DB

View File

@ -130,7 +130,6 @@ void PostgreSQLBlockOutputStream::write(const Block & block)
}
}
}
/// pqxx::stream_to is much faster than simple insert, especially for large number of rows
stream_inserter->write_values(row);
}

View File

@ -10,40 +10,7 @@
<user>postgres</user>
<password>mysecretpassword</password>
<table>test0</table>
</postgresql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
</id>
<attribute>
<name>id</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>1</lifetime>
</dictionary>
<dictionary>
<name>dict1</name>
<source>
<postgresql>
<db>clickhouse</db>
<host>postgres1</host>
<port>5432</port>
<user>postgres</user>
<password>mysecretpassword</password>
<table>test1</table>
<invalidate_query>SELECT value FROM test1 WHERE id = 0</invalidate_query>
<invalidate_query>SELECT value FROM test0 WHERE id = 0</invalidate_query>
</postgresql>
</source>
<layout>

View File

@ -36,8 +36,7 @@ def create_postgres_table(conn, table_name):
cursor = conn.cursor()
cursor.execute(postgres_dict_table_template.format(table_name))
def create_and_fill_postgres_table(table_name, index=0):
table_name = table_name + str(index)
def create_and_fill_postgres_table(table_name):
conn = get_postgres_conn(True)
create_postgres_table(conn, table_name)
# Fill postgres table using clickhouse postgres table function and check
@ -48,7 +47,7 @@ def create_and_fill_postgres_table(table_name, index=0):
assert result.rstrip() == '10000'
def create_dict(table_name, index=0):
node1.query(click_dict_table_template.format(table_name + str(index), 'dict' + str(index)))
node1.query(click_dict_table_template.format(table_name, 'dict' + str(index)))
@pytest.fixture(scope="module")
@ -68,10 +67,9 @@ def started_cluster():
def test_load_dictionaries(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
table_name = 'test'
table_name = 'test0'
create_and_fill_postgres_table(table_name)
create_dict(table_name)
table_name += str(0)
dict_name = 'dict0'
node1.query("SYSTEM RELOAD DICTIONARIES")
@ -84,36 +82,34 @@ def test_load_dictionaries(started_cluster):
def test_invalidate_query(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
table_name = 'test'
create_and_fill_postgres_table(table_name, 0)
create_and_fill_postgres_table(table_name, 1)
table_name = 'test0'
create_and_fill_postgres_table(table_name)
# this dict has no invalidate query
# invalidate query: SELECT value FROM test0 WHERE id = 0
dict_name = 'dict0'
create_dict(table_name)
node1.query("SYSTEM RELOAD DICTIONARIES")
first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name))
time.sleep(4)
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name))
assert first_update_time != second_update_time
# this dict has invalidate query: SELECT value FROM test1 WHERE id = 0
dict_name = 'dict1'
create_dict(table_name, 1)
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == "0\n"
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == "1\n"
first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name))
time.sleep(4)
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(table_name))
assert first_update_time != second_update_time
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
while True:
result = node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name))
if result != '0\n':
break
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n'
# no update should be made
cursor.execute("UPDATE {} SET value=value*2 WHERE id > 0".format(table_name+str(1)))
# no update should happen
cursor.execute("UPDATE {} SET value=value*2 WHERE id != 0".format(table_name))
time.sleep(5)
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '1\n'
# update should happen
cursor.execute("UPDATE {} SET value=value+1 WHERE id=0".format(table_name+str(1)))
cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name))
time.sleep(5)
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '2\n'
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '2\n'