diff --git a/src/Dictionaries/PostgreSQLDictionarySource.cpp b/src/Dictionaries/PostgreSQLDictionarySource.cpp index 9fcc9f56b53..cb3f9804e8a 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.cpp +++ b/src/Dictionaries/PostgreSQLDictionarySource.cpp @@ -1,21 +1,20 @@ #include "PostgreSQLDictionarySource.h" -#if USE_LIBPQXX +#include +#include "DictionarySourceFactory.h" +#include "registerDictionaries.h" -# include -# include -# include -# include -# include +#if USE_LIBPQXX +#include +#include #include #include "readInvalidateQuery.h" -#include "DictionarySourceFactory.h" + namespace DB { static const UInt64 max_block_size = 8192; - PostgreSQLDictionarySource::PostgreSQLDictionarySource( const DictionaryStructure & dict_struct_, const Poco::Util::AbstractConfiguration & config_, @@ -32,6 +31,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource( , query_builder(dict_struct, "", "", table, where, IdentifierQuotingStyle::DoubleQuotes) , load_all_query(query_builder.composeLoadAllQuery()) , invalidate_query(config_.getString(fmt::format("{}.invalidate_query", config_prefix), "")) + , update_field(config_.getString(fmt::format("{}.update_field", config_prefix), "")) { } @@ -49,6 +49,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar , load_all_query(query_builder.composeLoadAllQuery()) , invalidate_query(other.invalidate_query) , update_time(other.update_time) + , update_field(other.update_field) , invalidate_query_response(other.invalidate_query_response) { } @@ -69,7 +70,6 @@ BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll() return std::make_shared(connection, load_update_query, sample_block, max_block_size); } - BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector & ids) { const auto query = query_builder.composeLoadIdsQuery(ids); @@ -159,6 +159,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory) const std::string & /* default_database */, bool /* check_config */) -> DictionarySourcePtr { +#if USE_LIBPQXX const auto config_prefix = root_config_prefix + ".postgresql"; auto connection_str = fmt::format("dbname={} host={} port={} user={} password={}", config.getString(fmt::format("{}.db", config_prefix), ""), @@ -169,9 +170,17 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory) return std::make_unique( dict_struct, config, config_prefix, connection_str, sample_block); +#else + (void)dict_struct; + (void)config; + (void)root_config_prefix; + (void)sample_block; + throw Exception{"Dictionary source of type `postgresql` is disabled because ClickHouse was built without postgresql support.", + ErrorCodes::SUPPORT_IS_DISABLED}; +#endif }; factory.registerSource("postgresql", create_table_source); } } - #endif + diff --git a/src/Dictionaries/PostgreSQLDictionarySource.h b/src/Dictionaries/PostgreSQLDictionarySource.h index b56135ca0b2..30e90d7dde5 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.h +++ b/src/Dictionaries/PostgreSQLDictionarySource.h @@ -3,16 +3,14 @@ #if !defined(ARCADIA_BUILD) #include "config_core.h" #endif +#include "DictionaryStructure.h" +#include "IDictionarySource.h" #if USE_LIBPQXX - -# include -# include "DictionaryStructure.h" -# include "ExternalQueryBuilder.h" -# include "IDictionarySource.h" +#include "ExternalQueryBuilder.h" #include +#include #include - #include namespace DB diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp index ebbe49e78bf..7be396425b0 100644 --- a/src/Storages/StoragePostgreSQL.cpp +++ b/src/Storages/StoragePostgreSQL.cpp @@ -130,7 +130,6 @@ void PostgreSQLBlockOutputStream::write(const Block & block) } } } - /// pqxx::stream_to is much faster than simple insert, especially for large number of rows stream_inserter->write_values(row); } diff --git a/tests/integration/test_dictionaries_postgresql/configs/postgres_dict.xml b/tests/integration/test_dictionaries_postgresql/configs/postgres_dict.xml index aeb3787a89e..e3224b39a7d 100644 --- a/tests/integration/test_dictionaries_postgresql/configs/postgres_dict.xml +++ b/tests/integration/test_dictionaries_postgresql/configs/postgres_dict.xml @@ -10,40 +10,7 @@ postgres mysecretpassword test0
- - - - - - - - id - UInt32 - - - id - UInt32 - - - - value - UInt32 - - - - 1 - - - dict1 - - - clickhouse - postgres1 - 5432 - postgres - mysecretpassword - test1
- SELECT value FROM test1 WHERE id = 0 + SELECT value FROM test0 WHERE id = 0
diff --git a/tests/integration/test_dictionaries_postgresql/test.py b/tests/integration/test_dictionaries_postgresql/test.py index 1c0d94d56a6..cd7d575a999 100644 --- a/tests/integration/test_dictionaries_postgresql/test.py +++ b/tests/integration/test_dictionaries_postgresql/test.py @@ -36,8 +36,7 @@ def create_postgres_table(conn, table_name): cursor = conn.cursor() cursor.execute(postgres_dict_table_template.format(table_name)) -def create_and_fill_postgres_table(table_name, index=0): - table_name = table_name + str(index) +def create_and_fill_postgres_table(table_name): conn = get_postgres_conn(True) create_postgres_table(conn, table_name) # Fill postgres table using clickhouse postgres table function and check @@ -48,7 +47,7 @@ def create_and_fill_postgres_table(table_name, index=0): assert result.rstrip() == '10000' def create_dict(table_name, index=0): - node1.query(click_dict_table_template.format(table_name + str(index), 'dict' + str(index))) + node1.query(click_dict_table_template.format(table_name, 'dict' + str(index))) @pytest.fixture(scope="module") @@ -68,10 +67,9 @@ def started_cluster(): def test_load_dictionaries(started_cluster): conn = get_postgres_conn(True) cursor = conn.cursor() - table_name = 'test' + table_name = 'test0' create_and_fill_postgres_table(table_name) create_dict(table_name) - table_name += str(0) dict_name = 'dict0' node1.query("SYSTEM RELOAD DICTIONARIES") @@ -84,36 +82,34 @@ def test_load_dictionaries(started_cluster): def test_invalidate_query(started_cluster): conn = get_postgres_conn(True) cursor = conn.cursor() - table_name = 'test' - create_and_fill_postgres_table(table_name, 0) - create_and_fill_postgres_table(table_name, 1) + table_name = 'test0' + create_and_fill_postgres_table(table_name) - # this dict has no invalidate query + # invalidate query: SELECT value FROM test0 WHERE id = 0 dict_name = 'dict0' create_dict(table_name) - node1.query("SYSTEM RELOAD DICTIONARIES") - first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name)) - time.sleep(4) - second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name)) - assert first_update_time != second_update_time - - # this dict has invalidate query: SELECT value FROM test1 WHERE id = 0 - dict_name = 'dict1' - create_dict(table_name, 1) + node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name)) + assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == "0\n" assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == "1\n" - first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(dict_name)) - time.sleep(4) - second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = '{}'".format(table_name)) - assert first_update_time != second_update_time + # update should happen + cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name)) + while True: + result = node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) + if result != '0\n': + break + assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n' - # no update should be made - cursor.execute("UPDATE {} SET value=value*2 WHERE id > 0".format(table_name+str(1))) + # no update should happen + cursor.execute("UPDATE {} SET value=value*2 WHERE id != 0".format(table_name)) + time.sleep(5) + assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '1\n' assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '1\n' # update should happen - cursor.execute("UPDATE {} SET value=value+1 WHERE id=0".format(table_name+str(1))) + cursor.execute("UPDATE {} SET value=value+1 WHERE id = 0".format(table_name)) time.sleep(5) + assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(0))".format(dict_name)) == '2\n' assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(1))".format(dict_name)) == '2\n'