This commit is contained in:
stavrolia 2019-09-05 17:42:17 +03:00
parent 9c87e39ca2
commit 587a780ba2
6 changed files with 36 additions and 17 deletions

View File

@ -1,4 +1,5 @@
#include <IO/HDFSCommon.h>
#include <Poco/URI.h>
#if USE_HDFS
#include <Common/Exception.h>
@ -11,8 +12,9 @@ extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
HDFSBuilderPtr createHDFSBuilder(const std::string & uri_str)
{
const Poco::URI uri(uri_str);
auto & host = uri.getHost();
auto port = uri.getPort();
auto & path = uri.getPath();

View File

@ -1,7 +1,6 @@
#include <Common/config.h>
#include <memory>
#include <type_traits>
#include <Poco/URI.h>
#if USE_HDFS
#include <hdfs/hdfs.h>
@ -52,7 +51,7 @@ using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsD
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & hdfs_uri);
HDFSBuilderPtr createHDFSBuilder(const std::string & hdfs_uri);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -2,7 +2,6 @@
#if USE_HDFS
#include <IO/HDFSCommon.h>
#include <Poco/URI.h>
#include <hdfs/hdfs.h>
@ -16,7 +15,7 @@ namespace ErrorCodes
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
{
Poco::URI hdfs_uri;
std::string hdfs_uri;
hdfsFile fin;
HDFSBuilderPtr builder;
HDFSFSPtr fs;
@ -26,8 +25,8 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
auto & path = hdfs_uri.getPath();
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
if (fin == nullptr)
@ -39,7 +38,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
{
int bytes_read = hdfsRead(fs.get(), fin, start, size);
if (bytes_read < 0)
throw Exception("Fail to read HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return bytes_read;
}

View File

@ -2,7 +2,6 @@
#if USE_HDFS
#include <Poco/URI.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
@ -21,7 +20,7 @@ extern const int CANNOT_FSYNC;
struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
Poco::URI hdfs_uri;
std::string hdfs_uri;
hdfsFile fout;
HDFSBuilderPtr builder;
HDFSFSPtr fs;
@ -31,7 +30,11 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
auto & path = hdfs_uri.getPath();
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
if (path.find("*?{") != std::string::npos)
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0);
if (fout == nullptr)
@ -52,7 +55,7 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
int bytes_written = hdfsWrite(fs.get(), fout, start, size);
if (bytes_written < 0)
throw Exception("Fail to write HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
throw Exception("Fail to write HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return bytes_written;
}
@ -61,7 +64,7 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
int result = hdfsSync(fs.get(), fout);
if (result < 0)
throwFromErrno("Cannot HDFS sync" + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
throwFromErrno("Cannot HDFS sync" + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_FSYNC);
}
};

View File

@ -192,7 +192,7 @@ BlockInputStreams StorageHDFS::read(
const String path_from_uri = uri.substr(begin_of_path);
const String uri_without_path = uri.substr(0, begin_of_path);
HDFSBuilderPtr builder = createHDFSBuilder(Poco::URI(uri_without_path + "/"));
HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/");
HDFSFSPtr fs = createHDFSFS(builder.get());
const Strings res_paths = LSWithRegexpMatching("/", fs, path_from_uri);

View File

@ -28,15 +28,31 @@ def started_cluster():
cluster.shutdown()
def test_read_write_storage(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
node1.query("create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def test_read_write_storage_with_globs(started_cluster):
hdfs_api = HDFSApi("root")
for i in ["1", "2", "3"]:
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
node1.query("create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1..5}', 'TSV')")
node1.query("create table HDFSStorageWithEnum (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1,2,3,4,5}', 'TSV')")
node1.query("create table HDFSStorageWithQuestionMark (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage?', 'TSV')")
node1.query("create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage*', 'TSV')")
assert node1.query("select count(*) from HDFSStorageWithRange") == '3\n'
assert node1.query("select count(*) from HDFSStorageWithEnum") == '3\n'
assert node1.query("select count(*) from HDFSStorageWithQuestionMark") == '3\n'
assert node1.query("select count(*) from HDFSStorageWithAsterisk") == '3\n'
def test_read_write_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"