Better templates

This commit is contained in:
kssenii 2021-03-17 10:52:56 +00:00
parent 87c740730b
commit e722cee375
3 changed files with 19 additions and 25 deletions

View File

@ -23,25 +23,9 @@ namespace DB
{ {
template<> template<typename T>
PostgreSQLBlockInputStream<pqxx::read_transaction>::PostgreSQLBlockInputStream( PostgreSQLBlockInputStream<T>::PostgreSQLBlockInputStream(
std::shared_ptr<pqxx::read_transaction> tx_, std::shared_ptr<T> tx_,
const std::string & query_str_,
const Block & sample_block,
const UInt64 max_block_size_,
bool auto_commit_)
: query_str(query_str_)
, max_block_size(max_block_size_)
, auto_commit(auto_commit_)
, tx(tx_)
{
description.init(sample_block);
}
template<>
PostgreSQLBlockInputStream<pqxx::work>::PostgreSQLBlockInputStream(
std::shared_ptr<pqxx::work> tx_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_, const UInt64 max_block_size_,
@ -138,6 +122,12 @@ void PostgreSQLBlockInputStream<T>::readSuffix()
} }
} }
template
class PostgreSQLBlockInputStream<pqxx::work>;
template
class PostgreSQLBlockInputStream<pqxx::read_transaction>;
} }
#endif #endif

View File

@ -68,8 +68,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar
BlockInputStreamPtr PostgreSQLDictionarySource::loadAll() BlockInputStreamPtr PostgreSQLDictionarySource::loadAll()
{ {
LOG_TRACE(log, load_all_query); LOG_TRACE(log, load_all_query);
auto tx = std::make_shared<pqxx::read_transaction>(*connection->conn()); return loadBase(load_all_query);
return std::make_shared<PostgreSQLBlockInputStream<pqxx::read_transaction>>(tx, load_all_query, sample_block, max_block_size);
} }
@ -77,21 +76,25 @@ BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll()
{ {
auto load_update_query = getUpdateFieldAndDate(); auto load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query); LOG_TRACE(log, load_update_query);
auto tx = std::make_shared<pqxx::read_transaction>(*connection->conn()); return loadBase(load_update_query);
return std::make_shared<PostgreSQLBlockInputStream<pqxx::read_transaction>>(tx, load_update_query, sample_block, max_block_size);
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids) BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
const auto query = query_builder.composeLoadIdsQuery(ids); const auto query = query_builder.composeLoadIdsQuery(ids);
auto tx = std::make_shared<pqxx::read_transaction>(*connection->conn()); return loadBase(query);
return std::make_shared<PostgreSQLBlockInputStream<pqxx::read_transaction>>(tx, query, sample_block, max_block_size);
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) BlockInputStreamPtr PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{ {
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN); const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return loadBase(query);
}
BlockInputStreamPtr PostgreSQLDictionarySource::loadBase(const String & query)
{
auto tx = std::make_shared<pqxx::read_transaction>(*connection->conn()); auto tx = std::make_shared<pqxx::read_transaction>(*connection->conn());
return std::make_shared<PostgreSQLBlockInputStream<pqxx::read_transaction>>(tx, query, sample_block, max_block_size); return std::make_shared<PostgreSQLBlockInputStream<pqxx::read_transaction>>(tx, query, sample_block, max_block_size);
} }

View File

@ -37,6 +37,7 @@ public:
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadBase(const String & query);
bool isModified() const override; bool isModified() const override;
bool supportsSelectiveLoad() const override; bool supportsSelectiveLoad() const override;