diff --git a/docs/en/sql-reference/table-functions/hdfsCluster.md b/docs/en/sql-reference/table-functions/hdfsCluster.md new file mode 100644 index 00000000000..6183fe83c38 --- /dev/null +++ b/docs/en/sql-reference/table-functions/hdfsCluster.md @@ -0,0 +1,58 @@ +--- +toc_priority: 55 +toc_title: hdfsCluster +--- + +# hdfsCluster Table Function {#hdfsCluster-table-function} + +Allows processing files from HDFS in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterics in HDFS file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished. + +**Syntax** + +``` sql +hdfsCluster(cluster_name, URI, format, structure) +``` + +**Arguments** + +- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers. +- `URI` — URI to a file or a bunch of files. Supports following wildcards in readonly mode: `*`, `?`, `{'abc','def'}` and `{N..M}` where `N`, `M` — numbers, `abc`, `def` — strings. For more information see [Wildcards In Path](../../engines/table-engines/integrations/s3.md#wildcards-in-path). +- `format` — The [format](../../interfaces/formats.md#formats) of the file. +- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`. + +**Returned value** + +A table with the specified structure for reading data in the specified file. + +**Examples** + +1. Suppose that we have a ClickHouse cluster named `cluster_simple`, and several files with following URIs on HDFS: + +- ‘hdfs://hdfs1:9000/some_dir/some_file_1’ +- ‘hdfs://hdfs1:9000/some_dir/some_file_2’ +- ‘hdfs://hdfs1:9000/some_dir/some_file_3’ +- ‘hdfs://hdfs1:9000/another_dir/some_file_1’ +- ‘hdfs://hdfs1:9000/another_dir/some_file_2’ +- ‘hdfs://hdfs1:9000/another_dir/some_file_3’ + +2. Query the amount of rows in these files: + +``` sql +SELECT count(*) +FROM hdfsCluster('cluster_simple', 'hdfs://hdfs1:9000/{some,another}_dir/some_file_{1..3}', 'TSV', 'name String, value UInt32') +``` + +3. Query the amount of rows in all files of these two directories: + +``` sql +SELECT count(*) +FROM hdfsCluster('cluster_simple', 'hdfs://hdfs1:9000/{some,another}_dir/*', 'TSV', 'name String, value UInt32') +``` + +!!! warning "Warning" + If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`. + +**See Also** + +- [HDFS engine](../../engines/table-engines/integrations/hdfs.md) +- [HDFS table function](../../sql-reference/table-functions/hdfs.md) diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 70aa3d28174..ce9fc5c8129 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -54,6 +53,8 @@ namespace ErrorCodes extern const int ACCESS_DENIED; } +Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match); + StorageHDFS::StorageHDFS( const String & uri_, const StorageID & table_id_, @@ -62,13 +63,15 @@ StorageHDFS::StorageHDFS( const ConstraintsDescription & constraints_, const String & comment, ContextPtr context_, - const String & compression_method_ = "", + const String & compression_method_, + const bool distributed_processing_, ASTPtr partition_by_) : IStorage(table_id_) , WithContext(context_) , uri(uri_) , format_name(format_name_) , compression_method(compression_method_) + , distributed_processing(distributed_processing_) , partition_by(partition_by_) { context_->getRemoteHostFilter().checkURL(Poco::URI(uri)); @@ -81,154 +84,176 @@ StorageHDFS::StorageHDFS( setInMemoryMetadata(storage_metadata); } -using StorageHDFSPtr = std::shared_ptr; - -class HDFSSource : public SourceWithProgress, WithContext +class HDFSSource::DisclosedGlobIterator::Impl { public: - struct SourcesInfo + Impl(ContextPtr context_, const String & uri) { - std::vector uris; - std::atomic next_uri_to_read = 0; + const size_t begin_of_path = uri.find('/', uri.find("//") + 2); + const String path_from_uri = uri.substr(begin_of_path); + const String uri_without_path = uri.substr(0, begin_of_path); /// ends without '/' - bool need_path_column = false; - bool need_file_column = false; - }; - - using SourcesInfoPtr = std::shared_ptr; - - static Block getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column) - { - auto header = metadata_snapshot->getSampleBlock(); - - /// Note: AddingDefaultsBlockInputStream doesn't change header. - - if (need_path_column) - header.insert({DataTypeString().createColumn(), std::make_shared(), "_path"}); - if (need_file_column) - header.insert({DataTypeString().createColumn(), std::make_shared(), "_file"}); - - return header; + HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef()); + HDFSFSPtr fs = createHDFSFS(builder.get()); + std::lock_guard lock(mutex); + uris = LSWithRegexpMatching("/", fs, path_from_uri); + for (size_t i=0; igetSampleBlock(); + /// Note: AddingDefaultsBlockInputStream doesn't change header. + if (need_path_column) + header.insert({DataTypeString().createColumn(), std::make_shared(), "_path"}); + if (need_file_column) + header.insert({DataTypeString().createColumn(), std::make_shared(), "_file"}); + return header; +} + +Block HDFSSource::getBlockForSource( + const StorageHDFSPtr & storage, + const StorageMetadataPtr & metadata_snapshot, + const ColumnsDescription & columns_description, + bool need_path_column, + bool need_file_column) +{ + if (storage->isColumnOriented()) + return metadata_snapshot->getSampleBlockForColumns( + columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID()); + else + return getHeader(metadata_snapshot, need_path_column, need_file_column); +} + +HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(ContextPtr context_, const String & uri) + : pimpl(std::make_shared(context_, uri)) {} + +String HDFSSource::DisclosedGlobIterator::next() +{ + return pimpl->next(); +} + + +HDFSSource::HDFSSource( + StorageHDFSPtr storage_, + const StorageMetadataPtr & metadata_snapshot_, + ContextPtr context_, + UInt64 max_block_size_, + bool need_path_column_, + bool need_file_column_, + std::shared_ptr file_iterator_, + ColumnsDescription columns_description_) + : SourceWithProgress(getBlockForSource(storage_, metadata_snapshot_, columns_description_, need_path_column_, need_file_column_)) + , WithContext(context_) + , storage(std::move(storage_)) + , metadata_snapshot(metadata_snapshot_) + , max_block_size(max_block_size_) + , need_path_column(need_path_column_) + , need_file_column(need_file_column_) + , file_iterator(file_iterator_) + , columns_description(std::move(columns_description_)) +{ + initialize(); +} + +bool HDFSSource::initialize() +{ + current_path = (*file_iterator)(); + if (current_path.empty()) + return false; + const size_t begin_of_path = current_path.find('/', current_path.find("//") + 2); + const String path_from_uri = current_path.substr(begin_of_path); + const String uri_without_path = current_path.substr(0, begin_of_path); + + auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method); + read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression); + + auto get_block_for_format = [&]() -> Block { if (storage->isColumnOriented()) - return metadata_snapshot->getSampleBlockForColumns( - columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID()); - else - return getHeader(metadata_snapshot, files_info->need_path_column, files_info->need_file_column); - } + return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical()); + return metadata_snapshot->getSampleBlock(); + }; - HDFSSource( - StorageHDFSPtr storage_, - const StorageMetadataPtr & metadata_snapshot_, - ContextPtr context_, - UInt64 max_block_size_, - SourcesInfoPtr source_info_, - String uri_without_path_, - ColumnsDescription columns_description_) - : SourceWithProgress(getBlockForSource(storage_, metadata_snapshot_, columns_description_, source_info_)) - , WithContext(context_) - , storage(std::move(storage_)) - , metadata_snapshot(metadata_snapshot_) - , source_info(std::move(source_info_)) - , uri_without_path(std::move(uri_without_path_)) - , max_block_size(max_block_size_) - , columns_description(std::move(columns_description_)) - { - } + auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, get_block_for_format(), max_block_size); - String getName() const override + QueryPipelineBuilder builder; + builder.init(Pipe(input_format)); + if (columns_description.hasDefaults()) { - return "HDFS"; - } - - Chunk generate() override - { - while (true) + builder.addSimpleTransform([&](const Block & header) { - if (!reader) - { - auto pos = source_info->next_uri_to_read.fetch_add(1); - if (pos >= source_info->uris.size()) - return {}; + return std::make_shared(header, columns_description, *input_format, getContext()); + }); + } + pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); + reader = std::make_unique(*pipeline); + return true; +} - auto path = source_info->uris[pos]; - current_path = uri_without_path + path; +String HDFSSource::getName() const +{ + return "HDFSSource"; +} - auto compression = chooseCompressionMethod(path, storage->compression_method); - read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(uri_without_path, path, getContext()->getGlobalContext()->getConfigRef()), compression); +Chunk HDFSSource::generate() +{ + if (!reader) + return {}; - auto get_block_for_format = [&]() -> Block - { - if (storage->isColumnOriented()) - return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical()); - return metadata_snapshot->getSampleBlock(); - }; - auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, get_block_for_format(), max_block_size); + Chunk chunk; + if (reader->pull(chunk)) + { + Columns columns = chunk.getColumns(); + UInt64 num_rows = chunk.getNumRows(); - QueryPipelineBuilder builder; - builder.init(Pipe(input_format)); - if (columns_description.hasDefaults()) - { - builder.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, columns_description, *input_format, getContext()); - }); - } - pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); - reader = std::make_unique(*pipeline); - } - - Block res; - if (reader->pull(res)) - { - Columns columns = res.getColumns(); - UInt64 num_rows = res.rows(); - - /// Enrich with virtual columns. - if (source_info->need_path_column) - { - auto column = DataTypeString().createColumnConst(num_rows, current_path); - columns.push_back(column->convertToFullColumnIfConst()); - } - - if (source_info->need_file_column) - { - size_t last_slash_pos = current_path.find_last_of('/'); - auto file_name = current_path.substr(last_slash_pos + 1); - - auto column = DataTypeString().createColumnConst(num_rows, std::move(file_name)); - columns.push_back(column->convertToFullColumnIfConst()); - } - - return Chunk(std::move(columns), num_rows); - } - - reader.reset(); - pipeline.reset(); - read_buf.reset(); + /// Enrich with virtual columns. + if (need_path_column) + { + auto column = DataTypeString().createColumnConst(num_rows, current_path); + columns.push_back(column->convertToFullColumnIfConst()); } + + if (need_file_column) + { + size_t last_slash_pos = current_path.find_last_of('/'); + auto file_name = current_path.substr(last_slash_pos + 1); + + auto column = DataTypeString().createColumnConst(num_rows, std::move(file_name)); + columns.push_back(column->convertToFullColumnIfConst()); + } + + return Chunk(std::move(columns), num_rows); } -private: - StorageHDFSPtr storage; - StorageMetadataPtr metadata_snapshot; - SourcesInfoPtr source_info; - String uri_without_path; - UInt64 max_block_size; - ColumnsDescription columns_description; + reader.reset(); + pipeline.reset(); + read_buf.reset(); + + if (!initialize()) + return {}; + return generate(); +} - std::unique_ptr read_buf; - std::unique_ptr pipeline; - std::unique_ptr reader; - String current_path; -}; class HDFSSink : public SinkToStorage { @@ -300,7 +325,6 @@ public: private: const String uri; - const String format; const Block sample_block; ContextPtr context; @@ -367,29 +391,33 @@ Pipe StorageHDFS::read( size_t max_block_size, unsigned num_streams) { - const size_t begin_of_path = uri.find('/', uri.find("//") + 2); - const String path_from_uri = uri.substr(begin_of_path); - const String uri_without_path = uri.substr(0, begin_of_path); - - HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef()); - HDFSFSPtr fs = createHDFSFS(builder.get()); - - auto sources_info = std::make_shared(); - sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri); - - if (sources_info->uris.empty()) - LOG_WARNING(log, "No file in HDFS matches the path: {}", uri); - + bool need_path_column = false; + bool need_file_column = false; for (const auto & column : column_names) { if (column == "_path") - sources_info->need_path_column = true; + need_path_column = true; if (column == "_file") - sources_info->need_file_column = true; + need_file_column = true; } - if (num_streams > sources_info->uris.size()) - num_streams = sources_info->uris.size(); + std::shared_ptr iterator_wrapper{nullptr}; + if (distributed_processing) + { + iterator_wrapper = std::make_shared( + [callback = context_->getReadTaskCallback()]() -> String { + return callback(); + }); + } + else + { + /// Iterate through disclosed globs and make a source for each file + auto glob_iterator = std::make_shared(context_, uri); + iterator_wrapper = std::make_shared([glob_iterator]() + { + return glob_iterator->next(); + }); + } Pipes pipes; auto this_ptr = std::static_pointer_cast(shared_from_this()); @@ -409,8 +437,9 @@ Pipe StorageHDFS::read( metadata_snapshot, context_, max_block_size, - sources_info, - uri_without_path, + need_path_column, + need_file_column, + iterator_wrapper, get_columns_for_format())); } return Pipe::unitePipes(std::move(pipes)); @@ -443,13 +472,13 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP } } -void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr context_, TableExclusiveLockHolder &) +void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) { const size_t begin_of_path = uri.find('/', uri.find("//") + 2); const String path = uri.substr(begin_of_path); const String url = uri.substr(0, begin_of_path); - HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", context_->getGlobalContext()->getConfigRef()); + HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef()); HDFSFSPtr fs = createHDFSFS(builder.get()); int ret = hdfsDelete(fs.get(), path.data(), 0); @@ -488,7 +517,7 @@ void registerStorageHDFS(StorageFactory & factory) partition_by = args.storage_def->partition_by->clone(); return StorageHDFS::create( - url, args.table_id, format_name, args.columns, args.constraints, args.comment, args.getContext(), compression_method, partition_by); + url, args.table_id, format_name, args.columns, args.constraints, args.comment, args.getContext(), compression_method, false, partition_by); }, { .supports_sort_order = true, // for partition by diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index db6b078265d..f4322ec80c5 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -4,6 +4,7 @@ #if USE_HDFS +#include #include #include #include @@ -54,17 +55,82 @@ protected: const ConstraintsDescription & constraints_, const String & comment, ContextPtr context_, - const String & compression_method_, + const String & compression_method_ = "", + bool distributed_processing_ = false, ASTPtr partition_by = nullptr); private: const String uri; String format_name; String compression_method; + const bool distributed_processing; ASTPtr partition_by; Poco::Logger * log = &Poco::Logger::get("StorageHDFS"); }; + +class PullingPipelineExecutor; + +class HDFSSource : public SourceWithProgress, WithContext +{ +public: + class DisclosedGlobIterator + { + public: + DisclosedGlobIterator(ContextPtr context_, const String & uri_); + String next(); + private: + class Impl; + /// shared_ptr to have copy constructor + std::shared_ptr pimpl; + }; + + using IteratorWrapper = std::function; + using StorageHDFSPtr = std::shared_ptr; + + static Block getHeader( + const StorageMetadataPtr & metadata_snapshot, + bool need_path_column, + bool need_file_column); + + static Block getBlockForSource( + const StorageHDFSPtr & storage, + const StorageMetadataPtr & metadata_snapshot, + const ColumnsDescription & columns_description, + bool need_path_column, + bool need_file_column); + + HDFSSource( + StorageHDFSPtr storage_, + const StorageMetadataPtr & metadata_snapshot_, + ContextPtr context_, + UInt64 max_block_size_, + bool need_path_column_, + bool need_file_column_, + std::shared_ptr file_iterator_, + ColumnsDescription columns_description_); + + String getName() const override; + + Chunk generate() override; + +private: + StorageHDFSPtr storage; + StorageMetadataPtr metadata_snapshot; + UInt64 max_block_size; + bool need_path_column; + bool need_file_column; + std::shared_ptr file_iterator; + ColumnsDescription columns_description; + + std::unique_ptr read_buf; + std::unique_ptr pipeline; + std::unique_ptr reader; + String current_path; + + /// Recreate ReadBuffer and PullingPipelineExecutor for each file. + bool initialize(); +}; } #endif diff --git a/src/Storages/HDFS/StorageHDFSCluster.cpp b/src/Storages/HDFS/StorageHDFSCluster.cpp new file mode 100644 index 00000000000..ba1cc045fbf --- /dev/null +++ b/src/Storages/HDFS/StorageHDFSCluster.cpp @@ -0,0 +1,149 @@ +#include + +#if USE_HDFS + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace DB +{ +StorageHDFSCluster::StorageHDFSCluster( + String cluster_name_, + const String & uri_, + const StorageID & table_id_, + const String & format_name_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & compression_method_) + : IStorage(table_id_) + , cluster_name(cluster_name_) + , uri(uri_) + , format_name(format_name_) + , compression_method(compression_method_) +{ + StorageInMemoryMetadata storage_metadata; + storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); + setInMemoryMetadata(storage_metadata); +} + +/// The code executes on initiator +Pipe StorageHDFSCluster::read( + const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t /*max_block_size*/, + unsigned /*num_streams*/) +{ + auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettings()); + + auto iterator = std::make_shared(context, uri); + auto callback = std::make_shared([iterator]() mutable -> String + { + return iterator->next(); + }); + + /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) + Block header = + InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + + const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; + + Pipes pipes; + + const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; + + for (const auto & replicas : cluster->getShardsAddresses()) + { + /// There will be only one replica, because we consider each replica as a shard + for (const auto & node : replicas) + { + auto connection = std::make_shared( + node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(), + node.user, node.password, node.cluster, node.cluster_secret, + "HDFSClusterInititiator", + node.compression, + node.secure + ); + + + /// For unknown reason global context is passed to IStorage::read() method + /// So, task_identifier is passed as constructor argument. It is more obvious. + auto remote_query_executor = std::make_shared( + connection, + queryToString(query_info.query), + header, + context, + /*throttler=*/nullptr, + scalars, + Tables(), + processed_stage, + RemoteQueryExecutor::Extension{.task_iterator = callback}); + + pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); + } + } + + metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); + return Pipe::unitePipes(std::move(pipes)); +} + +QueryProcessingStage::Enum StorageHDFSCluster::getQueryProcessingStage( + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageMetadataPtr &, SelectQueryInfo &) const +{ + /// Initiator executes query on remote node. + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) + return QueryProcessingStage::Enum::WithMergeableState; + + /// Follower just reads the data. + return QueryProcessingStage::Enum::FetchColumns; +} + + +NamesAndTypesList StorageHDFSCluster::getVirtuals() const +{ + return NamesAndTypesList{ + {"_path", std::make_shared()}, + {"_file", std::make_shared()} + }; +} + + +} + +#endif diff --git a/src/Storages/HDFS/StorageHDFSCluster.h b/src/Storages/HDFS/StorageHDFSCluster.h new file mode 100644 index 00000000000..0e568a9faf8 --- /dev/null +++ b/src/Storages/HDFS/StorageHDFSCluster.h @@ -0,0 +1,55 @@ +#pragma once + +#include + +#if USE_HDFS + +#include +#include + +#include + +#include +#include +#include + +namespace DB +{ + +class Context; + +class StorageHDFSCluster : public shared_ptr_helper, public IStorage +{ + friend struct shared_ptr_helper; +public: + std::string getName() const override { return "HDFSCluster"; } + + Pipe read(const Names &, const StorageMetadataPtr &, SelectQueryInfo &, + ContextPtr, QueryProcessingStage::Enum, size_t /*max_block_size*/, unsigned /*num_streams*/) override; + + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override; + + NamesAndTypesList getVirtuals() const override; + +protected: + StorageHDFSCluster( + String cluster_name_, + const String & uri_, + const StorageID & table_id_, + const String & format_name_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & compression_method_); + +private: + String cluster_name; + String uri; + String format_name; + String compression_method; +}; + + +} + +#endif diff --git a/src/TableFunctions/TableFunctionHDFS.h b/src/TableFunctions/TableFunctionHDFS.h index d9ee9b47868..70bdc67efc8 100644 --- a/src/TableFunctions/TableFunctionHDFS.h +++ b/src/TableFunctions/TableFunctionHDFS.h @@ -12,7 +12,7 @@ namespace DB class Context; -/* hdfs(name_node_ip:name_node_port, format, structure) - creates a temporary storage from hdfs file +/* hdfs(URI, format, structure) - creates a temporary storage from hdfs files * */ class TableFunctionHDFS : public ITableFunctionFileLike diff --git a/src/TableFunctions/TableFunctionHDFSCluster.cpp b/src/TableFunctions/TableFunctionHDFSCluster.cpp new file mode 100644 index 00000000000..ca1ac6a11cd --- /dev/null +++ b/src/TableFunctions/TableFunctionHDFSCluster.cpp @@ -0,0 +1,116 @@ +#include + +#if USE_HDFS + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "registerTableFunctions.h" + +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + + +void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + /// Parse args + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + ASTs & args = args_func.at(0)->children; + + const auto message = fmt::format( + "The signature of table function {} shall be the following:\n" \ + " - cluster, uri, format, structure", + " - cluster, uri, format, structure, compression_method", + getName()); + + if (args.size() < 4 || args.size() > 5) + throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + /// This arguments are always the first + cluster_name = args[0]->as().value.safeGet(); + uri = args[1]->as().value.safeGet(); + format = args[2]->as().value.safeGet(); + structure = args[3]->as().value.safeGet(); + if (args.size() >= 5) + compression_method = args[4]->as().value.safeGet(); +} + + +ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr context) const +{ + return parseColumnsListFromString(structure, context); +} + +StoragePtr TableFunctionHDFSCluster::executeImpl( + const ASTPtr & /*function*/, ContextPtr context, + const std::string & table_name, ColumnsDescription /*cached_columns*/) const +{ + StoragePtr storage; + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + /// On worker node this uri won't contains globs + storage = StorageHDFS::create( + uri, + StorageID(getDatabaseName(), table_name), + format, + getActualTableStructure(context), + ConstraintsDescription{}, + String{}, + context, + compression_method, + /*distributed_processing=*/true, + nullptr); + } + else + { + storage = StorageHDFSCluster::create( + cluster_name, uri, StorageID(getDatabaseName(), table_name), + format, getActualTableStructure(context), ConstraintsDescription{}, + compression_method); + } + + storage->startup(); + + return storage; +} + + +void registerTableFunctionHDFSCluster(TableFunctionFactory & factory) +{ + factory.registerFunction(); +} + + +} + +#endif diff --git a/src/TableFunctions/TableFunctionHDFSCluster.h b/src/TableFunctions/TableFunctionHDFSCluster.h new file mode 100644 index 00000000000..58d1c3d9b05 --- /dev/null +++ b/src/TableFunctions/TableFunctionHDFSCluster.h @@ -0,0 +1,54 @@ +#pragma once + +#include + +#if USE_HDFS + +#include + + +namespace DB +{ + +class Context; + +/** + * hdfsCluster(cluster, URI, format, structure, compression_method) + * A table function, which allows to process many files from HDFS on a specific cluster + * On initiator it creates a connection to _all_ nodes in cluster, discloses asterics + * in HDFS file path and dispatch each file dynamically. + * On worker node it asks initiator about next task to process, processes it. + * This is repeated until the tasks are finished. + */ +class TableFunctionHDFSCluster : public ITableFunction +{ +public: + static constexpr auto name = "hdfsCluster"; + std::string getName() const override + { + return name; + } + bool hasStaticStructure() const override { return true; } + +protected: + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns) const override; + + const char * getStorageTypeName() const override { return "HDFSCluster"; } + + ColumnsDescription getActualTableStructure(ContextPtr) const override; + void parseArguments(const ASTPtr &, ContextPtr) override; + + String cluster_name; + String uri; + String format; + String structure; + String compression_method = "auto"; +}; + +} + +#endif diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index b4aab3e5c55..ea5c2c75f94 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -28,6 +28,7 @@ void registerTableFunctions() #if USE_HDFS registerTableFunctionHDFS(factory); + registerTableFunctionHDFSCluster(factory); #endif registerTableFunctionODBC(factory); diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index 8dbb5ebb5fa..8ddd9b7c8ab 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -26,6 +26,7 @@ void registerTableFunctionCOS(TableFunctionFactory & factory); #if USE_HDFS void registerTableFunctionHDFS(TableFunctionFactory & factory); +void registerTableFunctionHDFSCluster(TableFunctionFactory & factory); #endif void registerTableFunctionODBC(TableFunctionFactory & factory); diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index ede1dafefb1..33ce94a7a29 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -323,6 +323,28 @@ def test_read_table_with_default(started_cluster): +def test_hdfsCluster(started_cluster): + hdfs_api = started_cluster.hdfs_api + fs = HdfsClient(hosts=started_cluster.hdfs_ip) + dir = '/test_hdfsCluster' + exists = fs.exists(dir) + if exists: + fs.delete(dir, recursive=True) + fs.mkdirs(dir) + hdfs_api.write_data("/test_hdfsCluster/file1", "1\n") + hdfs_api.write_data("/test_hdfsCluster/file2", "2\n") + hdfs_api.write_data("/test_hdfsCluster/file3", "3\n") + + actual = node1.query("select id, _file as file_name, _path as file_path from hdfs('hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id") + expected = "1\tfile1\thdfs://hdfs1:9000/test_hdfsCluster/file1\n2\tfile2\thdfs://hdfs1:9000/test_hdfsCluster/file2\n3\tfile3\thdfs://hdfs1:9000/test_hdfsCluster/file3\n" + assert actual == expected + + actual = node1.query("select id, _file as file_name, _path as file_path from hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id") + expected = "1\tfile1\thdfs://hdfs1:9000/test_hdfsCluster/file1\n2\tfile2\thdfs://hdfs1:9000/test_hdfsCluster/file2\n3\tfile3\thdfs://hdfs1:9000/test_hdfsCluster/file3\n" + assert actual == expected + fs.delete(dir, recursive=True) + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...")