move get configuration to RedisCommon

This commit is contained in:
JackyWoo 2023-05-25 17:29:22 +08:00
parent 3281aec335
commit 3c2b447472
5 changed files with 59 additions and 59 deletions

View File

@ -151,13 +151,13 @@ RedisArrayPtr getRedisHashMapKeys(const RedisConnectionPtr & connection, RedisAr
RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names & all_columns, const String & column)
{
String redis_col_key = all_columns.at(0);
const String & redis_col_key = all_columns.at(0);
if (column == redis_col_key)
return RedisColumnType::KEY;
if (storage_type == RedisStorageType::HASH_MAP)
{
String redis_col_field = all_columns.at(1);
const String & redis_col_field = all_columns.at(1);
if (column == redis_col_field)
return RedisColumnType::FIELD;
else
@ -169,6 +169,59 @@ RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names &
}
}
RedisConfiguration getRedisConfiguration(const ASTs & engine_args, ContextPtr context)
{
RedisConfiguration configuration;
configuration.db_index = 0;
configuration.password = "";
configuration.storage_type = RedisStorageType::SIMPLE;
configuration.pool_size = 10;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
{
validateNamedCollection(
*named_collection,
ValidateKeysMultiset<RedisEqualKeysSet>{"host", "port", "hostname", "password", "db_index", "storage_type", "pool_size"},
{});
configuration.host = named_collection->getAny<String>({"host", "hostname"});
configuration.port = static_cast<uint32_t>(named_collection->getOrDefault<UInt64>("port", 6379));
if (engine_args.size() > 1)
configuration.password = named_collection->get<String>("password");
if (engine_args.size() > 2)
configuration.db_index = static_cast<uint32_t>(named_collection->get<UInt64>("db_index"));
if (engine_args.size() > 3)
configuration.storage_type = parseStorageType(named_collection->get<String>("storage_type"));
if (engine_args.size() > 4)
configuration.pool_size = static_cast<uint32_t>(named_collection->get<UInt64>("pool_size"));
}
else
{
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
/// 6379 is the default Redis port.
auto parsed_host_port = parseAddress(checkAndGetLiteralArgument<String>(engine_args[0], "host:port"), 6379);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
if (engine_args.size() > 1)
configuration.db_index = static_cast<uint32_t>(checkAndGetLiteralArgument<UInt64>(engine_args[1], "db_index"));
if (engine_args.size() > 2)
configuration.password = checkAndGetLiteralArgument<String>(engine_args[2], "password");
if (engine_args.size() > 3)
configuration.storage_type = parseStorageType(checkAndGetLiteralArgument<String>(engine_args[3], "storage_type"));
if (engine_args.size() > 4)
configuration.pool_size = static_cast<uint32_t>(checkAndGetLiteralArgument<UInt64>(engine_args[4], "pool_size"));
}
if (configuration.storage_type == RedisStorageType::UNKNOWN)
throw Exception(ErrorCodes::INVALID_REDIS_STORAGE_TYPE, "Invalid Redis storage type");
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
return configuration;
}
void checkRedisTableStructure(const ColumnsDescription & columns, const RedisConfiguration & configuration)
{
/// TODO check data type

View File

@ -83,6 +83,9 @@ RedisArrayPtr getRedisHashMapKeys(const RedisConnectionPtr & connection, RedisAr
/// HASH_MAP: all_columns must have 2 items and the first one is Redis key the second is field, the third is value.
RedisColumnType getRedisColumnType(RedisStorageType storage_type, const Names & all_columns, const String & column);
/// parse redis table engine/function configuration from engine_args
RedisConfiguration getRedisConfiguration(const ASTs & engine_args, ContextPtr context);
/// checking Redis table/table-function when creating
void checkRedisTableStructure(const ColumnsDescription & columns, const RedisConfiguration & configuration);

View File

@ -162,60 +162,6 @@ SinkToStoragePtr StorageRedis::write(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is unsupported for StorageRedis");
}
/// TODO make "password", "db_index", "storage_type", "pool_size" optional
RedisConfiguration StorageRedis::getConfiguration(ASTs engine_args, ContextPtr context)
{
RedisConfiguration configuration;
configuration.db_index = 0;
configuration.password = "";
configuration.storage_type = RedisStorageType::SIMPLE;
configuration.pool_size = 10;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
{
validateNamedCollection(
*named_collection,
ValidateKeysMultiset<RedisEqualKeysSet>{"host", "port", "hostname", "password", "db_index", "storage_type", "pool_size"},
{});
configuration.host = named_collection->getAny<String>({"host", "hostname"});
configuration.port = static_cast<uint32_t>(named_collection->getOrDefault<UInt64>("port", 6379));
if (engine_args.size() > 1)
configuration.password = named_collection->get<String>("password");
if (engine_args.size() > 2)
configuration.db_index = static_cast<uint32_t>(named_collection->get<UInt64>("db_index"));
if (engine_args.size() > 3)
configuration.storage_type = parseStorageType(named_collection->get<String>("storage_type"));
if (engine_args.size() > 4)
configuration.pool_size = static_cast<uint32_t>(named_collection->get<UInt64>("pool_size"));
}
else
{
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
/// 6379 is the default Redis port.
auto parsed_host_port = parseAddress(checkAndGetLiteralArgument<String>(engine_args[0], "host:port"), 6379);
configuration.host = parsed_host_port.first;
configuration.port = parsed_host_port.second;
if (engine_args.size() > 1)
configuration.db_index = static_cast<uint32_t>(checkAndGetLiteralArgument<UInt64>(engine_args[1], "db_index"));
if (engine_args.size() > 2)
configuration.password = checkAndGetLiteralArgument<String>(engine_args[2], "password");
if (engine_args.size() > 3)
configuration.storage_type = parseStorageType(checkAndGetLiteralArgument<String>(engine_args[3], "storage_type"));
if (engine_args.size() > 4)
configuration.pool_size = static_cast<uint32_t>(checkAndGetLiteralArgument<UInt64>(engine_args[4], "pool_size"));
}
if (configuration.storage_type == RedisStorageType::UNKNOWN)
throw Exception(ErrorCodes::INVALID_REDIS_STORAGE_TYPE, "Invalid Redis storage type");
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
return configuration;
}
void registerStorageRedis(StorageFactory & factory)
{
factory.registerStorage(

View File

@ -40,8 +40,6 @@ public:
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;
static RedisConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
private:
StorageID table_id;
RedisConfiguration configuration;

View File

@ -74,7 +74,7 @@ void TableFunctionRedis::parseArguments(const ASTPtr & ast_function, ContextPtr
"Table function 'Redis' requires from 5 parameters: "
"redis('host:port', db_index, 'password', 'storage_type', 'pool_size')");
}
configuration = StorageRedis::getConfiguration(args, context);
configuration = getRedisConfiguration(args, context);
}