Fix disk issues

This commit is contained in:
Oleg Ershov 2020-05-15 20:55:18 +03:00
parent ae715a8d1b
commit dd210cfeaf
2 changed files with 64 additions and 11 deletions

View File

@ -97,6 +97,7 @@ namespace
UInt32 s3_objects_count;
readIntText(s3_objects_count, buf);
std::cerr << "Metadata create, objects count " << s3_objects_count << std::endl;
assertChar('\t', buf);
readIntText(total_size, buf);
assertChar('\n', buf);
@ -214,6 +215,7 @@ namespace
std::cerr << "MetaData path and size " << path << " " << size << std::endl;
if (size > offset)
{
std::cerr << "Make ReadBuffer from " << (hdfs_name + path) << std::endl;
auto buf = std::make_unique<ReadBufferFromHDFS>(hdfs_name + path);
std::cerr << "Make offset " << offset << std::endl;
buf->seek(offset, SEEK_SET);
@ -514,7 +516,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
&Logger::get("DiskHDFS"),
"Read from file by path: " << backQuote(metadata_path + path) << " Existing HDFS objects: " << metadata.s3_objects.size());
return std::make_unique<ReadIndirectBufferFromHDFS>(hdfs_name + path, "", metadata, buf_size);
return std::make_unique<ReadIndirectBufferFromHDFS>(hdfs_name, "", metadata, buf_size);
}
std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
@ -531,7 +533,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
std::cerr << metadata_path << std::endl;
Metadata metadata(metadata_path, path, true);
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
// metadata.save();
LOG_DEBUG(&Logger::get("DiskHDFS"), "Write to file by path: " << backQuote(metadata_path + path) << " New HDFS path: " << HDFS_path);
@ -565,7 +567,7 @@ void DiskHDFS::remove(const String & path)
file.remove();
for (const auto & [s3_object_path, _] : metadata.s3_objects)
{
auto hdfs_path = "gtest/" + s3_object_path;
auto hdfs_path = "/gtest/" + s3_object_path;
int res = hdfsDelete(fs.get(), hdfs_path.c_str(), 0);
if (res == -1)
throw Exception("fuck " + hdfs_path, 1);
@ -696,8 +698,24 @@ void registerDiskHDFS(DiskFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
String path = config.getString(config_prefix + ".path", "");
return std::make_shared<DiskHDFS>(name, path, "");
const auto * disk_config = config.createView(config_prefix);
Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();
DB::String uri{disk_config->getString("endpoint")};
if (uri.back() != '/')
throw Exception("HDFS path must ends with '/', but '" + uri + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
// String metadata_path = context.getPath() + "disks/" + name + "/";
String metadata_path = "/home/ershov-ov/metadata/";
return std::make_shared<DiskHDFS>(
name,
uri,
metadata_path
);
};
factory.registerDiskType("hdfs", creator);
}

View File

@ -148,10 +148,45 @@ TYPED_TEST(DiskTest, iterateDirectory)
TEST(DiskHdfsTest, testHdfsCreation)
{
auto disk = DB::DiskHDFS("gtesthdfs", "hdfs://localhost:9010/gtest/", "/home/ershov-ov/metadata/");
auto out = disk.writeFile("keek", 1024, DB::WriteMode::Rewrite, 1024, 1024);
writeString("test data", *out);
DB::String d;
auto in = disk.readFile("keek", 1024, 1024, 1024, 1024);
readString(d, *in);
EXPECT_EQ("test_data", d);
{
auto out = disk.writeFile("keek", 1024, DB::WriteMode::Rewrite, 1024, 1024);
writeString("test data", *out);
}
{
DB::String d;
auto in = disk.readFile("keek", 1024, 1024, 1024, 1024);
readString(d, *in);
EXPECT_EQ("test data", d);
}
{
std::unique_ptr<DB::WriteBuffer> out = disk.writeFile("test_file", 1024, DB::WriteMode::Rewrite, 1024, 1024);
writeString("test data", *out);
}
// Test SEEK_SET
{
String buf(4, '0');
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile("test_file", 1024, 1024, 1024, 1024);
in->seek(5, SEEK_SET);
in->readStrict(buf.data(), 4);
EXPECT_EQ("data", buf);
}
// Test SEEK_CUR
{
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile("test_file", 1024, 1024, 1024, 1024);
String buf(4, '0');
in->readStrict(buf.data(), 4);
EXPECT_EQ("test", buf);
// Skip whitespace
in->seek(1, SEEK_CUR);
in->readStrict(buf.data(), 4);
EXPECT_EQ("data", buf);
}
}