USE_INTERNAL_HDFS3_LIBRARY + bash lint

This commit is contained in:
Ilya Golshtein 2020-10-29 23:40:47 +03:00
parent d9166a0645
commit d1d657335b
8 changed files with 89 additions and 43 deletions

View File

@ -5,6 +5,7 @@
#cmakedefine01 USE_RE2_ST
#cmakedefine01 USE_SSL
#cmakedefine01 USE_HDFS
#cmakedefine01 USE_INTERNAL_HDFS3_LIBRARY
#cmakedefine01 USE_AWS_S3
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND

View File

@ -14,11 +14,22 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
}
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & path)
// void HDFSBuilderWrapper::makeCachePath(const String & cachePath, String user)
// {
// if (hadoop_security_kerberos_ticket_cache_path.empty())
// {
// hadoop_security_kerberos_ticket_cache_path = cachePath + "KRB5CACHEPATH" + user;
// hdfsBuilderSetKerbTicketCachePath(hdfs_builder, hadoop_security_kerberos_ticket_cache_path.c_str());
// }
// }
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
const String & config_path, bool isUser)
{
hdfsBuilderConfSetStr(hdfs_builder, "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(hdfs_builder, "input.write.timeout", "60000"); // 1 min
@ -29,10 +40,10 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(path, keys);
config.keys(config_path, keys);
for (const auto & key : keys)
{
const String key_path = path + "." + key;
const String key_path = config_path + "." + key;
String key_name;
if (key == "hadoop_kerberos_keytab")
@ -45,7 +56,10 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
{
needKinit = true;
hadoop_kerberos_principal = config.getString(key_path);
#if USE_INTERNAL_HDFS3_LIBRARY
hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str());
#endif
continue;
}
@ -57,14 +71,21 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
}
else if (key == "hadoop_security_kerberos_ticket_cache_path")
{
if (isUser)
{
throw Exception("hadoop.security.kerberos.ticket.cache.path cannot be set per user",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
hadoop_security_kerberos_ticket_cache_path = config.getString(key_path);
// standard param - pass to libhdfs3
// standard param - pass further
}
key_name = boost::replace_all_copy(key, "_", ".");
const auto & [k,v] = keep(key_name, config.getString(key_path));
hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str());
}
}
@ -73,11 +94,15 @@ String HDFSBuilderWrapper::getKinitCmd()
std::stringstream ss;
<<<<<<< HEAD
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ? String() : (String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
String() :
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
ss << hadoop_kerberos_kinit_command << cache_name << " -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
ss << hadoop_kerberos_kinit_command << cache_name <<
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
@ -114,7 +139,8 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
HDFSBuilderWrapper builder;
if (builder.get() == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + String(hdfsGetLastError()),
throw Exception("Unable to create builder to connect to HDFS: " +
uri.toString() + " " + String(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
// hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
// hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
@ -169,16 +195,33 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
{
hdfsBuilderSetNameNodePort(builder.get(), port);
}
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
// builder.makeCachePath(context.getUserFilesPath());
}
if (!user.empty())
{
String user_config_prefix = HDFSBuilderWrapper::CONFIG_PREFIX + "_" + user;
if (config.has(user_config_prefix))
builder.loadFromConfig(config, user_config_prefix);
{
#if USE_INTERNAL_HDFS3_LIBRARY
builder.loadFromConfig(config, user_config_prefix, true);
#else
throw Exception("Multi user HDFS configuration required internal libhdfs3",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
#endif
}
// builder.makeCachePath(context.getUserFilesPath(), user);
}
// else
// {
// builder.makeCachePath(context.getUserFilesPath());
// }
if (builder.needKinit)
{

View File

@ -103,26 +103,23 @@ class HDFSBuilderWrapper
static std::mutex kinit_mtx;
/*mutable*/ std::vector<std::pair<String, String>> config_stor;
std::vector<std::pair<String, String>> config_stor;
std::pair<String, String>& keep(const String & k, const String & v)
{
return config_stor.emplace_back(std::make_pair(k, v));
}
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & path);
void loadFromConfig(const Poco::Util::AbstractConfiguration & config,
const String & config_path, bool isUser = false);
String getKinitCmd();
bool needKinit{false};
bool
needKinit{false};
void runKinit();
void
runKinit();
void makeCachePath(const String & cachePath, String user = "");
static const String CONFIG_PREFIX;

View File

@ -37,8 +37,12 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
if (path.find_first_of("*?{") != std::string::npos)
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
int flags = hdfsExists(fs.get(), path.c_str()) ? (O_WRONLY|O_APPEND) : O_WRONLY; /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 0);
// int flags = hdfsExists(fs.get(), path.c_str()) ? (O_WRONLY|O_SYNC) : (O_WRONLY|O_APPEND|O_SYNC); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
// fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 1024*1024);
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
if (fout == nullptr)
{

View File

@ -187,7 +187,6 @@ def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
def test_write_gz_storage(started_cluster):
node1.query(
"create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage.gz', 'TSV')")
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
@ -196,7 +195,6 @@ def test_write_gz_storage(started_cluster):
def test_write_gzip_storage(started_cluster):
node1.query(
"create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/gzip_storage', 'TSV', 'gzip')")
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")

View File

@ -4,9 +4,9 @@
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<hdfs_specuser>
<hdfs_suser>
<hadoop_kerberos_principal>specuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_specuser>
</hdfs_suser>
<hdfs_dedicatedcachepath>
<hadoop_security_kerberos_ticket_cache_path>/tmp/kerb_cache</hadoop_security_kerberos_ticket_cache_path>
</hdfs_dedicatedcachepath>

View File

@ -1,7 +1,6 @@
#!/bin/bash
: ${HADOOP_PREFIX:=/usr/local/hadoop}
: "${HADOOP_PREFIX:=/usr/local/hadoop}"
cat >> $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh <<EOF
export HADOOP_SECURE_DN_USER=hdfs
@ -11,16 +10,18 @@ export JSVC_HOME=$HADOOP_PREFIX/sbin
EOF
$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
mkdir -p ${HADOOP_SECURE_DN_PID_DIR}
mkdir -p ${HADOOP_SECURE_DN_LOG_DIR}
mkdir -p "${HADOOP_SECURE_DN_PID_DIR}"
mkdir -p "${HADOOP_SECURE_DN_LOG_DIR}"
rm /tmp/*.pid
# installing libraries if any - (resource urls added comma separated to the ACP system variable)
cd $HADOOP_PREFIX/share/hadoop/common ; for cp in ${ACP//,/ }; do echo == $cp; curl -LO $cp ; done; cd -
cd "${HADOOP_PREFIX}/share/hadoop/common" || exit
for cp in ${ACP//,/ }; do echo "== ${cp}"; curl -LO "${cp}" ; done;
cd - || exit
# altering the core-site configuration
sed s/HOSTNAME/$HOSTNAME/ /usr/local/hadoop/etc/hadoop/core-site.xml.template | grep -v '/configuration' > /usr/local/hadoop/etc/hadoop/core-site.xml
sed "s/HOSTNAME/${HOSTNAME}/" /usr/local/hadoop/etc/hadoop/core-site.xml.template | grep -v '/configuration' > /usr/local/hadoop/etc/hadoop/core-site.xml
cat >> /usr/local/hadoop/etc/hadoop/core-site.xml << EOF
<property>

View File

@ -69,7 +69,7 @@ def test_write_storage_expired(started_cluster):
def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://specuser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
try:
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
assert False, "Exception have to be thrown"
@ -81,30 +81,32 @@ def test_two_users(started_cluster):
node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')")
node1.query("insert into HDFSStorOne values (1, 'IlyaReal', 86.00)")
node1.query("create table HDFSStorTwo (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://specuser@kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
node1.query("create table HDFSStorTwo (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
node1.query("insert into HDFSStorTwo values (1, 'IlyaIdeal', 74.00)")
select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read_1", select_read_1)
select_read_2 = node1.query("select * from hdfs('hdfs://specuser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read_2", select_read_2)
node1.query("create table HDFSStorTwo_ (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
try:
node1.query("insert into HDFSStorTwo_ values (1, 'AnotherPerspn', 88.54)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: Unable to open HDFS file: /user/specuser/storage_user_two error: Permission denied: user=root, access=WRITE, inode=\"/user/specuser/storage_user_two\":specuser:supergroup:drwxr-xr-x" in str(ex)
# node1.query("create table HDFSStorTwo_ (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
# try:
# node1.query("insert into HDFSStorTwo_ values (1, 'AnotherPerspn', 88.54)")
# assert False, "Exception have to be thrown"
# except Exception as ex:
# print ex
# assert "DB::Exception: Unable to open HDFS file: /user/specuser/storage_user_two error: Permission denied: user=root, access=WRITE, inode=\"/user/specuser/storage_user_two\":specuser:supergroup:drwxr-xr-x" in str(ex)
def test_cache_path(started_cluster):
node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
try:
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex)
api_read = started_cluster.hdfs_api.read_data("/storage_dedicated_cache_path")
print("api_read", api_read)
assert api_read == "1\tFatMark\t92.53\n"
def test_read_table_not_expired(started_cluster):