#pragma once #include #if USE_HDFS #include #include #include #include #include namespace DB { /** * This class represents table engine for external hdfs files. * Read method is supported for now. */ class StorageHDFS final : public shared_ptr_helper, public IStorage, WithContext { friend struct shared_ptr_helper; public: String getName() const override { return "HDFS"; } Pipe read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override; void truncate( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override; NamesAndTypesList getVirtuals() const override; bool supportsPartitionBy() const override { return true; } /// Check if the format is column-oriented. /// Is is useful because column oriented formats could effectively skip unknown columns /// So we can create a header of only required columns in read method and ask /// format to read only them. Note: this hack cannot be done with ordinary formats like TSV. bool isColumnOriented() const override; static ColumnsDescription getTableStructureFromData( const String & format, const String & uri, const String & compression_method, ContextPtr ctx); protected: friend class HDFSSource; StorageHDFS( const String & uri_, const StorageID & table_id_, const String & format_name_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & comment, ContextPtr context_, const String & compression_method_ = "", bool distributed_processing_ = false, ASTPtr partition_by = nullptr); private: std::vector uris; String format_name; String compression_method; const bool distributed_processing; ASTPtr partition_by; bool is_path_with_globs; NamesAndTypesList virtual_columns; Poco::Logger * log = &Poco::Logger::get("StorageHDFS"); }; class PullingPipelineExecutor; class HDFSSource : public SourceWithProgress, WithContext { public: class DisclosedGlobIterator { public: DisclosedGlobIterator(ContextPtr context_, const String & uri_); String next(); private: class Impl; /// shared_ptr to have copy constructor std::shared_ptr pimpl; }; class URISIterator { public: URISIterator(const std::vector & uris_, ContextPtr context); String next(); private: class Impl; /// shared_ptr to have copy constructor std::shared_ptr pimpl; }; using IteratorWrapper = std::function; using StorageHDFSPtr = std::shared_ptr; static Block getHeader(Block sample_block, const std::vector & requested_virtual_columns); HDFSSource( StorageHDFSPtr storage_, const Block & block_for_format_, const std::vector & requested_virtual_columns_, ContextPtr context_, UInt64 max_block_size_, std::shared_ptr file_iterator_, ColumnsDescription columns_description_); String getName() const override; Chunk generate() override; void onCancel() override; private: StorageHDFSPtr storage; Block block_for_format; std::vector requested_virtual_columns; UInt64 max_block_size; bool need_path_column; bool need_file_column; std::shared_ptr file_iterator; ColumnsDescription columns_description; std::unique_ptr read_buf; std::unique_ptr pipeline; std::unique_ptr reader; /// onCancel and generate can be called concurrently. std::mutex reader_mutex; String current_path; /// Recreate ReadBuffer and PullingPipelineExecutor for each file. bool initialize(); }; } #endif