mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Better code and tests for bad cases
This commit is contained in:
parent
33d5f0c8dd
commit
21f1c4d1ce
43
dbms/src/IO/HDFSCommon.cpp
Normal file
43
dbms/src/IO/HDFSCommon.cpp
Normal file
@ -0,0 +1,43 @@
|
||||
#include <IO/HDFSCommon.h>
|
||||
|
||||
#if USE_HDFS
|
||||
#include <Common/Exception.h>
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NETWORK_ERROR;
|
||||
}
|
||||
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
|
||||
{
|
||||
auto & host = uri.getHost();
|
||||
auto port = uri.getPort();
|
||||
auto & path = uri.getPath();
|
||||
if (host.empty() || port == 0 || path.empty())
|
||||
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
HDFSBuilderPtr builder(hdfsNewBuilder());
|
||||
if (builder == nullptr)
|
||||
throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
|
||||
|
||||
hdfsBuilderSetNameNode(builder.get(), host.c_str());
|
||||
hdfsBuilderSetNameNodePort(builder.get(), port);
|
||||
return builder;
|
||||
}
|
||||
|
||||
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
|
||||
{
|
||||
HDFSFSPtr fs(hdfsBuilderConnect(builder));
|
||||
if (fs == nullptr)
|
||||
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
|
||||
return fs;
|
||||
}
|
||||
}
|
||||
#endif
|
38
dbms/src/IO/HDFSCommon.h
Normal file
38
dbms/src/IO/HDFSCommon.h
Normal file
@ -0,0 +1,38 @@
|
||||
#include <Common/config.h>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <Poco/URI.h>
|
||||
|
||||
#if USE_HDFS
|
||||
#include <hdfs/hdfs.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace detail
|
||||
{
|
||||
struct HDFSBuilderDeleter
|
||||
{
|
||||
void operator()(hdfsBuilder * builder_ptr)
|
||||
{
|
||||
hdfsFreeBuilder(builder_ptr);
|
||||
}
|
||||
};
|
||||
struct HDFSFsDeleter
|
||||
{
|
||||
void operator()(hdfsFS fs_ptr)
|
||||
{
|
||||
hdfsDisconnect(fs_ptr);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
using HDFSBuilderPtr = std::unique_ptr<hdfsBuilder, detail::HDFSBuilderDeleter>;
|
||||
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
|
||||
|
||||
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
|
||||
/// TODO Allow to tune from query Settings.
|
||||
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & hdfs_uri);
|
||||
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
|
||||
}
|
||||
#endif
|
@ -2,82 +2,51 @@
|
||||
|
||||
#if USE_HDFS
|
||||
#include <Poco/URI.h>
|
||||
#include <IO/HDFSCommon.h>
|
||||
#include <hdfs/hdfs.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NETWORK_ERROR;
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
}
|
||||
|
||||
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
|
||||
{
|
||||
struct HDFSBuilderDeleter
|
||||
{
|
||||
void operator()(hdfsBuilder * builder_ptr)
|
||||
{
|
||||
hdfsFreeBuilder(builder_ptr);
|
||||
}
|
||||
};
|
||||
|
||||
std::string hdfs_uri;
|
||||
std::unique_ptr<hdfsBuilder, HDFSBuilderDeleter> builder;
|
||||
hdfsFS fs;
|
||||
Poco::URI hdfs_uri;
|
||||
hdfsFile fin;
|
||||
|
||||
HDFSBuilderPtr builder;
|
||||
HDFSFSPtr fs;
|
||||
|
||||
ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
|
||||
: hdfs_uri(hdfs_name_)
|
||||
, builder(hdfsNewBuilder())
|
||||
, builder(createHDFSBuilder(hdfs_uri))
|
||||
, fs(createHDFSFS(builder.get()))
|
||||
{
|
||||
Poco::URI uri(hdfs_name_);
|
||||
auto & host = uri.getHost();
|
||||
auto port = uri.getPort();
|
||||
auto & path = uri.getPath();
|
||||
if (host.empty() || port == 0 || path.empty())
|
||||
{
|
||||
throw Exception("Illegal HDFS URI: " + hdfs_uri, ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
|
||||
/// TODO Allow to tune from query Settings.
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
|
||||
|
||||
hdfsBuilderSetNameNode(builder.get(), host.c_str());
|
||||
hdfsBuilderSetNameNodePort(builder.get(), port);
|
||||
fs = hdfsBuilderConnect(builder.get());
|
||||
|
||||
if (fs == nullptr)
|
||||
{
|
||||
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
}
|
||||
|
||||
fin = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
|
||||
auto & path = hdfs_uri.getPath();
|
||||
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
|
||||
|
||||
if (fin == nullptr)
|
||||
{
|
||||
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
~ReadBufferFromHDFSImpl()
|
||||
{
|
||||
hdfsCloseFile(fs, fin);
|
||||
hdfsDisconnect(fs);
|
||||
ErrorCodes::CANNOT_OPEN_FILE);
|
||||
}
|
||||
|
||||
int read(char * start, size_t size)
|
||||
{
|
||||
int bytes_read = hdfsRead(fs, fin, start, size);
|
||||
int bytes_read = hdfsRead(fs.get(), fin, start, size);
|
||||
if (bytes_read < 0)
|
||||
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
|
||||
throw Exception("Fail to read HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
return bytes_read;
|
||||
}
|
||||
|
||||
~ReadBufferFromHDFSImpl()
|
||||
{
|
||||
hdfsCloseFile(fs.get(), fin);
|
||||
}
|
||||
};
|
||||
|
||||
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#if USE_HDFS
|
||||
#include <Poco/URI.h>
|
||||
#include <IO/HDFSCommon.h>
|
||||
#include <hdfs/hdfs.h>
|
||||
|
||||
namespace DB
|
||||
@ -9,86 +10,55 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NETWORK_ERROR;
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_FSYNC;
|
||||
}
|
||||
|
||||
|
||||
struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
|
||||
{
|
||||
struct HDFSBuilderDeleter
|
||||
{
|
||||
void operator()(hdfsBuilder * builder_ptr)
|
||||
{
|
||||
hdfsFreeBuilder(builder_ptr);
|
||||
}
|
||||
};
|
||||
|
||||
std::string hdfs_uri;
|
||||
std::unique_ptr<hdfsBuilder, HDFSBuilderDeleter> builder;
|
||||
hdfsFS fs;
|
||||
Poco::URI hdfs_uri;
|
||||
hdfsFile fout;
|
||||
HDFSBuilderPtr builder;
|
||||
HDFSFSPtr fs;
|
||||
|
||||
WriteBufferFromHDFSImpl(const std::string & hdfs_name_)
|
||||
: hdfs_uri(hdfs_name_)
|
||||
, builder(hdfsNewBuilder())
|
||||
, builder(createHDFSBuilder(hdfs_uri))
|
||||
, fs(createHDFSFS(builder.get()))
|
||||
{
|
||||
Poco::URI uri(hdfs_name_);
|
||||
auto & host = uri.getHost();
|
||||
auto port = uri.getPort();
|
||||
auto & path = uri.getPath();
|
||||
if (host.empty() || port == 0 || path.empty())
|
||||
{
|
||||
throw Exception("Illegal HDFS URI: " + hdfs_uri, ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
auto & path = hdfs_uri.getPath();
|
||||
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0);
|
||||
|
||||
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
|
||||
/// TODO Allow to tune from query Settings.
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
|
||||
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
|
||||
|
||||
hdfsBuilderSetNameNode(builder.get(), host.c_str());
|
||||
hdfsBuilderSetNameNodePort(builder.get(), port);
|
||||
fs = hdfsBuilderConnect(builder.get());
|
||||
|
||||
if (fs == nullptr)
|
||||
{
|
||||
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
}
|
||||
|
||||
fout = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0);
|
||||
if (fout == nullptr)
|
||||
{
|
||||
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
ErrorCodes::CANNOT_OPEN_FILE);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
~WriteBufferFromHDFSImpl()
|
||||
{
|
||||
hdfsCloseFile(fs, fout);
|
||||
hdfsDisconnect(fs);
|
||||
hdfsCloseFile(fs.get(), fout);
|
||||
}
|
||||
|
||||
|
||||
int write(const char * start, size_t size)
|
||||
{
|
||||
int bytes_written = hdfsWrite(fs, fout, start, size);
|
||||
int bytes_written = hdfsWrite(fs.get(), fout, start, size);
|
||||
if (bytes_written < 0)
|
||||
throw Exception("Fail to write HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
|
||||
throw Exception("Fail to write HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::NETWORK_ERROR);
|
||||
return bytes_written;
|
||||
}
|
||||
|
||||
void sync()
|
||||
{
|
||||
int result = hdfsSync(fs, fout);
|
||||
int result = hdfsSync(fs.get(), fout);
|
||||
if (result < 0)
|
||||
throwFromErrno("Cannot HDFS sync" + hdfs_uri + " " + std::string(hdfsGetLastError()),
|
||||
throwFromErrno("Cannot HDFS sync" + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
|
||||
ErrorCodes::CANNOT_FSYNC);
|
||||
}
|
||||
};
|
||||
|
@ -56,3 +56,21 @@ def test_write_table(started_cluster):
|
||||
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
|
||||
assert hdfs_api.read_data("/other_storage") == result
|
||||
assert node1.query("select * from OtherHDFSStorage order by id") == result
|
||||
|
||||
def test_bad_hdfs_uri(started_cluster):
|
||||
try:
|
||||
node1.query("create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS('hads:hgsdfs100500:9000/other_storage', 'TSV')")
|
||||
except Exception as ex:
|
||||
print ex
|
||||
assert 'Illegal HDFS URI' in str(ex)
|
||||
try:
|
||||
node1.query("create table BadStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs100500:9000/other_storage', 'TSV')")
|
||||
except Exception as ex:
|
||||
print ex
|
||||
assert 'Unable to create builder to connect to HDFS' in str(ex)
|
||||
|
||||
try:
|
||||
node1.query("create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/<>', 'TSV')")
|
||||
except Exception as ex:
|
||||
print ex
|
||||
assert 'Unable to open HDFS file' in str(ex)
|
||||
|
Loading…
Reference in New Issue
Block a user