Merge pull request #23980 from kssenii/add-postgres-schema

Add missing table schema for postgres dictionary
This commit is contained in:
alesapin 2021-05-12 10:50:49 +03:00 committed by GitHub
commit ed3f89a7be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 7 deletions

View File

@ -25,6 +25,24 @@ namespace ErrorCodes
static const UInt64 max_block_size = 8192; static const UInt64 max_block_size = 8192;
namespace
{
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, String & schema, String & table, const String & where)
{
if (schema.empty())
{
if (auto pos = table.find('.'); pos != std::string::npos)
{
schema = table.substr(0, pos);
table = table.substr(pos + 1);
}
}
/// Do not need db because it is already in a connection string.
return {dict_struct, "", schema, table, where, IdentifierQuotingStyle::DoubleQuotes};
}
}
PostgreSQLDictionarySource::PostgreSQLDictionarySource( PostgreSQLDictionarySource::PostgreSQLDictionarySource(
const DictionaryStructure & dict_struct_, const DictionaryStructure & dict_struct_,
postgres::PoolWithFailoverPtr pool_, postgres::PoolWithFailoverPtr pool_,
@ -36,9 +54,10 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(
, pool(std::move(pool_)) , pool(std::move(pool_))
, log(&Poco::Logger::get("PostgreSQLDictionarySource")) , log(&Poco::Logger::get("PostgreSQLDictionarySource"))
, db(config_.getString(fmt::format("{}.db", config_prefix), "")) , db(config_.getString(fmt::format("{}.db", config_prefix), ""))
, schema(config_.getString(fmt::format("{}.schema", config_prefix), ""))
, table(config_.getString(fmt::format("{}.table", config_prefix), "")) , table(config_.getString(fmt::format("{}.table", config_prefix), ""))
, where(config_.getString(fmt::format("{}.where", config_prefix), "")) , where(config_.getString(fmt::format("{}.where", config_prefix), ""))
, query_builder(dict_struct, "", "", table, where, IdentifierQuotingStyle::DoubleQuotes) , query_builder(makeExternalQueryBuilder(dict_struct, schema, table, where))
, load_all_query(query_builder.composeLoadAllQuery()) , load_all_query(query_builder.composeLoadAllQuery())
, invalidate_query(config_.getString(fmt::format("{}.invalidate_query", config_prefix), "")) , invalidate_query(config_.getString(fmt::format("{}.invalidate_query", config_prefix), ""))
, update_field(config_.getString(fmt::format("{}.update_field", config_prefix), "")) , update_field(config_.getString(fmt::format("{}.update_field", config_prefix), ""))

View File

@ -46,8 +46,8 @@ public:
std::string toString() const override; std::string toString() const override;
private: private:
std::string getUpdateFieldAndDate(); String getUpdateFieldAndDate();
std::string doInvalidateQuery(const std::string & request) const; String doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const String & query); BlockInputStreamPtr loadBase(const String & query);
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
@ -55,12 +55,13 @@ private:
postgres::PoolWithFailoverPtr pool; postgres::PoolWithFailoverPtr pool;
Poco::Logger * log; Poco::Logger * log;
const std::string db; const String db;
const std::string table; String schema;
const std::string where; String table;
const String where;
ExternalQueryBuilder query_builder; ExternalQueryBuilder query_builder;
const std::string load_all_query; const std::string load_all_query;
std::string invalidate_query; String invalidate_query;
std::chrono::time_point<std::chrono::system_clock> update_time; std::chrono::time_point<std::chrono::system_clock> update_time;
const std::string update_field; const std::string update_field;
mutable std::string invalidate_query_response; mutable std::string invalidate_query_response;

View File

@ -159,6 +159,35 @@ def test_dictionary_with_replicas(started_cluster):
node1.query("DROP DICTIONARY IF EXISTS dict1") node1.query("DROP DICTIONARY IF EXISTS dict1")
def test_postgres_scema(started_cluster):
conn = get_postgres_conn(port=5432, database=True)
cursor = conn.cursor()
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.test_table (id integer, value integer)')
cursor.execute('INSERT INTO test_schema.test_table SELECT i, i FROM generate_series(0, 99) as t(i)')
node1.query('''
CREATE DICTIONARY postgres_dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(POSTGRESQL(
port 5432
host 'postgres1'
user 'postgres'
password 'mysecretpassword'
db 'clickhouse'
table 'test_schema.test_table'))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(1))")
assert(int(result.strip()) == 1)
result = node1.query("SELECT dictGetUInt32(postgres_dict, 'value', toUInt64(99))")
assert(int(result.strip()) == 99)
node1.query("DROP DICTIONARY IF EXISTS postgres_dict")
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")