This commit is contained in:
taiyang-li 2022-06-07 09:58:29 +08:00
parent b36d9f8143
commit c65c56fd48
9 changed files with 18 additions and 28 deletions

View File

@ -8,16 +8,9 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/config.h> #include <Common/config.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#if USE_HDFS
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#endif
#include <future> #include <future>
#include <iostream>
namespace ProfileEvents namespace ProfileEvents

View File

@ -1,8 +1,8 @@
#pragma once #pragma once
#include <IO/AsynchronousReader.h> #include <IO/AsynchronousReader.h>
#include <IO/ReadBuffer.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
namespace DB namespace DB
{ {

View File

@ -6,6 +6,7 @@
#if USE_ARROW || USE_ORC || USE_PARQUET #if USE_ARROW || USE_ORC || USE_PARQUET
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromFileDescriptor.h> #include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/copyData.h> #include <IO/copyData.h>
@ -83,11 +84,13 @@ arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Tell() const
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, void * out) arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, void * out)
{ {
LOG_TEST(log, "object:{} size:{} read {} bytes", reinterpret_cast<std::uintptr_t>(this), GetSize().ValueOrDie(), nbytes);
return in.readBig(reinterpret_cast<char *>(out), nbytes); return in.readBig(reinterpret_cast<char *>(out), nbytes);
} }
arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes) arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes)
{ {
LOG_TEST(log, "object:{} size:{} read {} bytes", reinterpret_cast<std::uintptr_t>(this), GetSize().ValueOrDie(), nbytes);
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes)) ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes))
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())) ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()))
@ -99,6 +102,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBu
arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position) arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
{ {
LOG_TEST(log, "object:{} size:{} seek to {}", reinterpret_cast<std::uintptr_t>(this), GetSize().ValueOrDie(), position);
seekable_in.seek(position, SEEK_SET); seekable_in.seek(position, SEEK_SET);
return arrow::Status::OK(); return arrow::Status::OK();
} }

View File

@ -3,9 +3,11 @@
#if USE_ARROW || USE_ORC || USE_PARQUET #if USE_ARROW || USE_ORC || USE_PARQUET
#include <arrow/io/interfaces.h>
#include <optional> #include <optional>
#include <arrow/io/interfaces.h>
#include <Poco/Logger.h>
#define ORC_MAGIC_BYTES "ORC" #define ORC_MAGIC_BYTES "ORC"
#define PARQUET_MAGIC_BYTES "PAR1" #define PARQUET_MAGIC_BYTES "PAR1"
#define ARROW_MAGIC_BYTES "ARROW1" #define ARROW_MAGIC_BYTES "ARROW1"
@ -68,6 +70,7 @@ private:
SeekableReadBuffer & seekable_in; SeekableReadBuffer & seekable_in;
std::optional<off_t> file_size; std::optional<off_t> file_size;
bool is_open = false; bool is_open = false;
Poco::Logger * log = &Poco::Logger::get("RandomAccessFileFromSeekableReadBuffer");
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer); ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
}; };

View File

@ -79,6 +79,8 @@ std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncRe
void AsynchronousReadBufferFromHDFS::prefetch() void AsynchronousReadBufferFromHDFS::prefetch()
{ {
interval_watch.restart();
if (prefetch_future.valid()) if (prefetch_future.valid())
return; return;
@ -162,7 +164,6 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
sum_duration += next_watch.elapsedMicroseconds(); sum_duration += next_watch.elapsedMicroseconds();
sum_wait += wait; sum_wait += wait;
interval_watch.restart();
return size; return size;
} }

View File

@ -912,14 +912,6 @@ StorageHive::totalRowsImpl(const Settings & settings, const SelectQueryInfo & qu
return total_rows; return total_rows;
} }
AsynchronousReaderPtr StorageHive::getThreadPoolReader()
{
constexpr size_t pool_size = 50;
constexpr size_t queue_size = 1000000;
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(pool_size, queue_size);
return reader;
}
void registerStorageHive(StorageFactory & factory) void registerStorageHive(StorageFactory & factory)
{ {
factory.registerStorage( factory.registerStorage(

View File

@ -25,8 +25,6 @@ class HiveSettings;
class StorageHive final : public IStorage, WithContext class StorageHive final : public IStorage, WithContext
{ {
public: public:
static AsynchronousReaderPtr getThreadPoolReader();
friend class StorageHiveSource; friend class StorageHiveSource;
StorageHive( StorageHive(

View File

@ -10,7 +10,7 @@ target_link_libraries (get_current_inserts_in_replicated PRIVATE dbms clickhouse
add_executable (get_abandonable_lock_in_all_partitions get_abandonable_lock_in_all_partitions.cpp) add_executable (get_abandonable_lock_in_all_partitions get_abandonable_lock_in_all_partitions.cpp)
target_link_libraries (get_abandonable_lock_in_all_partitions PRIVATE dbms clickhouse_common_config clickhouse_common_zookeeper) target_link_libraries (get_abandonable_lock_in_all_partitions PRIVATE dbms clickhouse_common_config clickhouse_common_zookeeper)
if (TARGET ch_contrib::hivemetastore) if (TARGET ch_contrib::hdfs)
add_executable (async_read_buffer_from_hdfs async_read_buffer_from_hdfs.cpp) add_executable (async_read_buffer_from_hdfs async_read_buffer_from_hdfs.cpp)
target_link_libraries (async_read_buffer_from_hdfs PRIVATE dbms ch_contrib::hivemetastore ch_contrib::hdfs ch_contrib::parquet) target_link_libraries (async_read_buffer_from_hdfs PRIVATE dbms ch_contrib::hdfs)
endif () endif ()

View File

@ -1,21 +1,20 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <filesystem>
#include <IO/ReadBufferFromString.h>
#include <IO/SnappyReadBuffer.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/copyData.h> #include <IO/copyData.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/Hive/StorageHive.h>
#include <Common/Config/ConfigProcessor.h> #include <Common/Config/ConfigProcessor.h>
#include <Storages/HDFS/AsynchronousReadBufferFromHDFS.h> #include <Storages/HDFS/AsynchronousReadBufferFromHDFS.h>
int main() int main()
{ {
using namespace DB; using namespace DB;
namespace fs = std::filesystem;
String config_path = "/path/to/config/file"; String config_path = "/path/to/config/file";
ConfigProcessor config_processor(config_path, false, true); ConfigProcessor config_processor(config_path, false, true);
config_processor.setConfigPath(fs::path(config_path).parent_path()); config_processor.setConfigPath(fs::path(config_path).parent_path());
@ -25,7 +24,7 @@ int main()
String hdfs_namenode_url = "hdfs://namenode:port/"; String hdfs_namenode_url = "hdfs://namenode:port/";
String path = "/path/to/hdfs/file"; String path = "/path/to/hdfs/file";
auto in = std::make_unique<ReadBufferFromHDFS>(hdfs_namenode_url, path, *config); auto in = std::make_unique<ReadBufferFromHDFS>(hdfs_namenode_url, path, *config);
auto reader = StorageHive::getThreadPoolReader(); auto reader = IObjectStorage::getThreadPoolReader();
AsynchronousReadBufferFromHDFS buf(reader, {}, std::move(in)); AsynchronousReadBufferFromHDFS buf(reader, {}, std::move(in));
String output; String output;