diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index a22db080374..82714de3470 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -609,6 +609,7 @@ M(638, SNAPPY_UNCOMPRESS_FAILED) \ M(639, SNAPPY_COMPRESS_FAILED) \ M(640, NO_HIVEMETASTORE) \ + M(641, CANNOT_APPEND_TO_FILE) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index d55be808aa8..0aae455d058 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -75,7 +75,11 @@ class IColumn; M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \ M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \ M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \ + M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ + M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \ M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \ + M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \ + M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \ M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \ M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \ M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \ @@ -490,6 +494,7 @@ class IColumn; \ M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \ M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \ + M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \ M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \ M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \ M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index e00a473f584..3f220cbb20a 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -394,6 +394,27 @@ void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name target = std::move(non_trivial_prefix_and_suffix_checker); } +void FormatFactory::registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker) +{ + auto & target = dict[name].append_support_checker; + if (target) + throw Exception("FormatFactory: Suffix checker " + name + " is already registered", ErrorCodes::LOGICAL_ERROR); + target = std::move(append_support_checker); +} + +void FormatFactory::markFormatHasNoAppendSupport(const String & name) +{ + registerAppendSupportChecker(name, [](const FormatSettings &){ return false; }); +} + +bool FormatFactory::checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional & format_settings_) +{ + auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context); + auto & append_support_checker = dict[name].append_support_checker; + /// By default we consider that format supports append + return !append_support_checker || append_support_checker(format_settings); +} + void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator) { auto & target = dict[name].output_creator; diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index a5eaa43a29f..228d5234959 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -93,6 +93,10 @@ private: /// The checker should return true if parallel parsing should be disabled. using NonTrivialPrefixAndSuffixChecker = std::function; + /// Some formats can support append depending on settings. + /// The checker should return true if format support append. + using AppendSupportChecker = std::function; + using SchemaReaderCreator = std::function; using ExternalSchemaReaderCreator = std::function; @@ -106,6 +110,7 @@ private: bool supports_parallel_formatting{false}; bool is_column_oriented{false}; NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker; + AppendSupportChecker append_support_checker; }; using FormatsDictionary = std::unordered_map; @@ -167,6 +172,14 @@ public: void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker); + void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker); + + /// If format always doesn't support append, you can use this method instead of + /// registerAppendSupportChecker with append_support_checker that always returns true. + void markFormatHasNoAppendSupport(const String & name); + + bool checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional & format_settings_ = std::nullopt); + /// Register format by its name. void registerInputFormat(const String & name, InputCreator input_creator); void registerOutputFormat(const String & name, OutputCreator output_creator); diff --git a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp index 692f17f843a..60408f13ff0 100644 --- a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp @@ -93,6 +93,7 @@ void registerOutputFormatArrow(FormatFactory & factory) { return std::make_shared(buf, sample, false, format_settings); }); + factory.markFormatHasNoAppendSupport("Arrow"); factory.registerOutputFormat( "ArrowStream", @@ -103,6 +104,7 @@ void registerOutputFormatArrow(FormatFactory & factory) { return std::make_shared(buf, sample, true, format_settings); }); + factory.markFormatHasNoAppendSupport("ArrowStream"); } } diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index fb3389475ac..70373480920 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -479,6 +479,7 @@ void registerOutputFormatAvro(FormatFactory & factory) { return std::make_shared(buf, sample, params, settings); }); + factory.markFormatHasNoAppendSupport("Avro"); } } diff --git a/src/Processors/Formats/Impl/CustomSeparatedRowOutputFormat.cpp b/src/Processors/Formats/Impl/CustomSeparatedRowOutputFormat.cpp index 21cb549d4cb..4c8cf19b923 100644 --- a/src/Processors/Formats/Impl/CustomSeparatedRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/CustomSeparatedRowOutputFormat.cpp @@ -91,6 +91,11 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory) }); factory.markOutputFormatSupportsParallelFormatting(format_name); + + factory.registerAppendSupportChecker(format_name, [](const FormatSettings & settings) + { + return settings.custom.result_after_delimiter.empty(); + }); }; registerWithNamesAndTypes("CustomSeparated", register_func); diff --git a/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp index 8e2b2617c4c..8130b2b4cb1 100644 --- a/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp @@ -284,6 +284,7 @@ void registerOutputFormatJSON(FormatFactory & factory) }); factory.markOutputFormatSupportsParallelFormatting("JSON"); + factory.markFormatHasNoAppendSupport("JSON"); factory.registerOutputFormat("JSONStrings", []( WriteBuffer & buf, @@ -295,6 +296,7 @@ void registerOutputFormatJSON(FormatFactory & factory) }); factory.markOutputFormatSupportsParallelFormatting("JSONStrings"); + factory.markFormatHasNoAppendSupport("JSONStrings"); } } diff --git a/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp index 651b9545c81..106b71a9df5 100644 --- a/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockOutputFormat.cpp @@ -526,6 +526,7 @@ void registerOutputFormatORC(FormatFactory & factory) { return std::make_shared(buf, sample, format_settings); }); + factory.markFormatHasNoAppendSupport("ORC"); } } diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp index a10858ee668..68e2ae1c6eb 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -85,6 +85,7 @@ void registerOutputFormatParquet(FormatFactory & factory) { return std::make_shared(buf, sample, format_settings); }); + factory.markFormatHasNoAppendSupport("Parquet"); } } diff --git a/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp b/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp index d981b92e1dd..5c5b99f61da 100644 --- a/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp @@ -235,5 +235,19 @@ void registerOutputFormatTemplate(FormatFactory & factory) return std::make_shared(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter); }); + + factory.registerAppendSupportChecker("Template", [](const FormatSettings & settings) + { + if (settings.template_settings.resultset_format.empty()) + return true; + auto resultset_format = ParsedTemplateFormatString( + FormatSchemaInfo(settings.template_settings.resultset_format, "Template", false, + settings.schema.is_server, settings.schema.format_schema_path), + [&](const String & partName) + { + return static_cast(TemplateBlockOutputFormat::stringToResultsetPart(partName)); + }); + return resultset_format.delimiters.empty() || resultset_format.delimiters.back().empty(); + }); } } diff --git a/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp b/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp index d96981fc091..cc2b37189f9 100644 --- a/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/XMLRowOutputFormat.cpp @@ -256,6 +256,7 @@ void registerOutputFormatXML(FormatFactory & factory) }); factory.markOutputFormatSupportsParallelFormatting("XML"); + factory.markFormatHasNoAppendSupport("XML"); } } diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 74367c81eb9..95cc4542977 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -14,9 +14,8 @@ #include #include -#include +#include -#include #include #include #include @@ -28,7 +27,6 @@ #include #include - #include #include #include @@ -52,7 +50,9 @@ namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int ACCESS_DENIED; + extern const int DATABASE_ACCESS_DENIED; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; + extern const int BAD_ARGUMENTS; extern const int LOGICAL_ERROR; } namespace @@ -139,20 +139,23 @@ StorageHDFS::StorageHDFS( ASTPtr partition_by_) : IStorage(table_id_) , WithContext(context_) - , uri(uri_) + , uris({uri_}) , format_name(format_name_) , compression_method(compression_method_) , distributed_processing(distributed_processing_) , partition_by(partition_by_) { - context_->getRemoteHostFilter().checkURL(Poco::URI(uri)); - checkHDFSURL(uri); + context_->getRemoteHostFilter().checkURL(Poco::URI(uri_)); + checkHDFSURL(uri_); + + String path = uri_.substr(uri_.find('/', uri_.find("//") + 2)); + is_path_with_globs = path.find_first_of("*?{") != std::string::npos; StorageInMemoryMetadata storage_metadata; if (columns_.empty()) { - auto columns = getTableStructureFromData(format_name, uri, compression_method, context_); + auto columns = getTableStructureFromData(format_name, uri_, compression_method, context_); storage_metadata.setColumns(columns); } else @@ -217,6 +220,39 @@ private: Strings::iterator uris_iter; }; +class HDFSSource::URISIterator::Impl +{ +public: + explicit Impl(const std::vector & uris_, ContextPtr context) + { + auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]); + HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef()); + HDFSFSPtr fs = createHDFSFS(builder.get()); + for (const auto & uri : uris_) + { + path_and_uri = getPathFromUriAndUriWithoutPath(uri); + if (!hdfsExists(fs.get(), path_and_uri.first.c_str())) + uris.push_back(uri); + } + uris_iter = uris.begin(); + } + + String next() + { + std::lock_guard lock(mutex); + if (uris_iter == uris.end()) + return ""; + auto key = *uris_iter; + ++uris_iter; + return key; + } + +private: + std::mutex mutex; + Strings uris; + Strings::iterator uris_iter; +}; + Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column) { auto header = metadata_snapshot->getSampleBlock(); @@ -250,6 +286,15 @@ String HDFSSource::DisclosedGlobIterator::next() return pimpl->next(); } +HDFSSource::URISIterator::URISIterator(const std::vector & uris_, ContextPtr context) + : pimpl(std::make_shared(uris_, context)) +{ +} + +String HDFSSource::URISIterator::next() +{ + return pimpl->next(); +} HDFSSource::HDFSSource( StorageHDFSPtr storage_, @@ -284,9 +329,8 @@ bool HDFSSource::initialize() current_path = (*file_iterator)(); if (current_path.empty()) return false; - const size_t begin_of_path = current_path.find('/', current_path.find("//") + 2); - const String path_from_uri = current_path.substr(begin_of_path); - const String uri_without_path = current_path.substr(0, begin_of_path); + + const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path); auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method); read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression); @@ -469,15 +513,23 @@ Pipe StorageHDFS::read( return callback(); }); } - else + else if (is_path_with_globs) { /// Iterate through disclosed globs and make a source for each file - auto glob_iterator = std::make_shared(context_, uri); + auto glob_iterator = std::make_shared(context_, uris[0]); iterator_wrapper = std::make_shared([glob_iterator]() { return glob_iterator->next(); }); } + else + { + auto uris_iterator = std::make_shared(uris, context_); + iterator_wrapper = std::make_shared([uris_iterator]() + { + return uris_iterator->next(); + }); + } Pipes pipes; auto this_ptr = std::static_pointer_cast(shared_from_this()); @@ -505,9 +557,11 @@ Pipe StorageHDFS::read( return Pipe::unitePipes(std::move(pipes)); } -SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/) +SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_) { - bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos; + String current_uri = uris.back(); + + bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos; const auto * insert_query = dynamic_cast(query.get()); auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; bool is_partitioned_implementation = partition_by_ast && has_wildcards; @@ -516,34 +570,70 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP { return std::make_shared( partition_by_ast, - uri, + current_uri, format_name, metadata_snapshot->getSampleBlock(), - getContext(), - chooseCompressionMethod(uri, compression_method)); + context_, + chooseCompressionMethod(current_uri, compression_method)); } else { - return std::make_shared(uri, + if (is_path_with_globs) + throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back()); + + const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_uri); + + HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef()); + HDFSFSPtr fs = createHDFSFS(builder.get()); + + bool truncate_on_insert = context_->getSettingsRef().hdfs_truncate_on_insert; + if (!truncate_on_insert && !hdfsExists(fs.get(), path_from_uri.c_str())) + { + if (context_->getSettingsRef().hdfs_create_new_file_on_insert) + { + auto pos = uris[0].find_first_of('.', uris[0].find_last_of('/')); + size_t index = uris.size(); + String new_uri; + do + { + new_uri = uris[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : uris[0].substr(pos)); + ++index; + } + while (!hdfsExists(fs.get(), new_uri.c_str())); + uris.push_back(new_uri); + current_uri = new_uri; + } + else + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, " + "if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert", + path_from_uri); + } + + return std::make_shared(current_uri, format_name, metadata_snapshot->getSampleBlock(), - getContext(), - chooseCompressionMethod(uri, compression_method)); + context_, + chooseCompressionMethod(current_uri, compression_method)); } } void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) { - const size_t begin_of_path = uri.find('/', uri.find("//") + 2); - const String path = uri.substr(begin_of_path); - const String url = uri.substr(0, begin_of_path); + const size_t begin_of_path = uris[0].find('/', uris[0].find("//") + 2); + const String url = uris[0].substr(0, begin_of_path); HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef()); HDFSFSPtr fs = createHDFSFS(builder.get()); - int ret = hdfsDelete(fs.get(), path.data(), 0); - if (ret) - throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError())); + for (const auto & uri : uris) + { + const String path = uri.substr(begin_of_path); + int ret = hdfsDelete(fs.get(), path.data(), 0); + if (ret) + throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError())); + } } diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index 9e845d8fd74..8e455189bc6 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -31,7 +31,7 @@ public: size_t max_block_size, unsigned num_streams) override; - SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override; + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override; void truncate( const ASTPtr & query, @@ -70,11 +70,12 @@ protected: ASTPtr partition_by = nullptr); private: - const String uri; + std::vector uris; String format_name; String compression_method; const bool distributed_processing; ASTPtr partition_by; + bool is_path_with_globs; Poco::Logger * log = &Poco::Logger::get("StorageHDFS"); }; @@ -95,6 +96,17 @@ public: std::shared_ptr pimpl; }; + class URISIterator + { + public: + URISIterator(const std::vector & uris_, ContextPtr context); + String next(); + private: + class Impl; + /// shared_ptr to have copy constructor + std::shared_ptr pimpl; + }; + using IteratorWrapper = std::function; using StorageHDFSPtr = std::shared_ptr; diff --git a/src/Storages/HDFS/WriteBufferFromHDFS.cpp b/src/Storages/HDFS/WriteBufferFromHDFS.cpp index 9f5e3c1f7d2..2addfc0069f 100644 --- a/src/Storages/HDFS/WriteBufferFromHDFS.cpp +++ b/src/Storages/HDFS/WriteBufferFromHDFS.cpp @@ -15,7 +15,6 @@ namespace ErrorCodes extern const int NETWORK_ERROR; extern const int CANNOT_OPEN_FILE; extern const int CANNOT_FSYNC; -extern const int BAD_ARGUMENTS; } @@ -38,12 +37,6 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); const String path = hdfs_uri.substr(begin_of_path); - if (path.find_first_of("*?{") != std::string::npos) - throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri); - - if (!hdfsExists(fs.get(), path.c_str())) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path); - fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here if (fout == nullptr) diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index a479f982c70..e6c5f25dd57 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -65,6 +65,7 @@ namespace ErrorCodes extern const int INCOMPATIBLE_COLUMNS; extern const int CANNOT_STAT; extern const int LOGICAL_ERROR; + extern const int CANNOT_APPEND_TO_FILE; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; } @@ -285,6 +286,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us { is_db_table = false; paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read); + is_path_with_globs = paths.size() > 1; path_for_partitioned_write = table_path_; setStorageMetadata(args); } @@ -603,7 +605,7 @@ public: int table_fd_, bool use_table_fd_, std::string base_path_, - std::vector paths_, + std::string path_, const CompressionMethod compression_method_, const std::optional & format_settings_, const String format_name_, @@ -615,7 +617,7 @@ public: , table_fd(table_fd_) , use_table_fd(use_table_fd_) , base_path(base_path_) - , paths(paths_) + , path(path_) , compression_method(compression_method_) , format_name(format_name_) , format_settings(format_settings_) @@ -632,7 +634,7 @@ public: int table_fd_, bool use_table_fd_, std::string base_path_, - std::vector paths_, + const std::string & path_, const CompressionMethod compression_method_, const std::optional & format_settings_, const String format_name_, @@ -644,7 +646,7 @@ public: , table_fd(table_fd_) , use_table_fd(use_table_fd_) , base_path(base_path_) - , paths(paths_) + , path(path_) , compression_method(compression_method_) , format_name(format_name_) , format_settings(format_settings_) @@ -666,10 +668,8 @@ public: } else { - if (paths.size() != 1) - throw Exception("Table '" + table_name_for_log + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED); flags |= O_WRONLY | O_APPEND | O_CREAT; - naked_buffer = std::make_unique(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags); + naked_buffer = std::make_unique(path, DBMS_DEFAULT_BUFFER_SIZE, flags); } /// In case of formats with prefixes if file is not empty we have already written prefix. @@ -709,7 +709,7 @@ private: int table_fd; bool use_table_fd; std::string base_path; - std::vector paths; + std::string path; CompressionMethod compression_method; std::string format_name; std::optional format_settings; @@ -752,7 +752,6 @@ public: { auto partition_path = PartitionedSink::replaceWildcards(path, partition_id); PartitionedSink::validatePartitionKey(partition_path, true); - Strings result_paths = {partition_path}; checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path); return std::make_shared( metadata_snapshot, @@ -760,7 +759,7 @@ public: -1, /* use_table_fd */false, base_path, - result_paths, + partition_path, compression_method, format_settings, format_name, @@ -794,7 +793,6 @@ SinkToStoragePtr StorageFile::write( int flags = 0; - std::string path; if (context->getSettingsRef().engine_file_truncate_on_insert) flags |= O_TRUNC; @@ -815,7 +813,7 @@ SinkToStoragePtr StorageFile::write( std::unique_lock{rwlock, getLockTimeout(context)}, base_path, path_for_partitioned_write, - chooseCompressionMethod(path, compression_method), + chooseCompressionMethod(path_for_partitioned_write, compression_method), format_settings, format_name, context, @@ -823,10 +821,41 @@ SinkToStoragePtr StorageFile::write( } else { + String path; if (!paths.empty()) { - path = paths[0]; + if (is_path_with_globs) + throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED); + + path = paths.back(); fs::create_directories(fs::path(path).parent_path()); + + if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs + && !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings) && fs::exists(paths.back()) + && fs::file_size(paths.back()) != 0) + { + if (context->getSettingsRef().engine_file_allow_create_multiple_files) + { + auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/')); + size_t index = paths.size(); + String new_path; + do + { + new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos)); + ++index; + } + while (fs::exists(new_path)); + paths.push_back(new_path); + path = new_path; + } + else + throw Exception( + ErrorCodes::CANNOT_APPEND_TO_FILE, + "Cannot append data in format {} to file, because this format doesn't support appends." + " You can allow to create a new file " + "on each insert by enabling setting engine_file_allow_create_multiple_files", + format_name); + } } return std::make_shared( @@ -836,7 +865,7 @@ SinkToStoragePtr StorageFile::write( table_fd, use_table_fd, base_path, - paths, + path, chooseCompressionMethod(path, compression_method), format_settings, format_name, @@ -882,7 +911,7 @@ void StorageFile::truncate( ContextPtr /* context */, TableExclusiveLockHolder &) { - if (paths.size() != 1) + if (is_path_with_globs) throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED); if (use_table_fd) @@ -892,11 +921,14 @@ void StorageFile::truncate( } else { - if (!fs::exists(paths[0])) - return; + for (const auto & path : paths) + { + if (!fs::exists(path)) + continue; - if (0 != ::truncate(paths[0].c_str(), 0)) - throwFromErrnoWithPath("Cannot truncate file " + paths[0], paths[0], ErrorCodes::CANNOT_TRUNCATE_FILE); + if (0 != ::truncate(path.c_str(), 0)) + throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE); + } } } diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 6b015976589..8564d93ccc2 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -120,6 +120,8 @@ private: size_t total_bytes_to_read = 0; String path_for_partitioned_write; + + bool is_path_with_globs = false; }; } diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index fda6333d614..e2294aaabc5 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -68,7 +68,7 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int S3_ERROR; extern const int UNEXPECTED_EXPRESSION; - extern const int CANNOT_OPEN_FILE; + extern const int DATABASE_ACCESS_DENIED; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; } @@ -82,8 +82,6 @@ public: Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_) : client(client_), globbed_uri(globbed_uri_) { - std::lock_guard lock(mutex); - if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos) throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION); @@ -176,6 +174,37 @@ String StorageS3Source::DisclosedGlobIterator::next() return pimpl->next(); } +class StorageS3Source::KeysIterator::Impl +{ +public: + explicit Impl(const std::vector & keys_) : keys(keys_), keys_iter(keys.begin()) + { + } + + String next() + { + std::lock_guard lock(mutex); + if (keys_iter == keys.end()) + return ""; + auto key = *keys_iter; + ++keys_iter; + return key; + } + +private: + std::mutex mutex; + Strings keys; + Strings::iterator keys_iter; +}; + +StorageS3Source::KeysIterator::KeysIterator(const std::vector & keys_) : pimpl(std::make_shared(keys_)) +{ +} + +String StorageS3Source::KeysIterator::next() +{ + return pimpl->next(); +} Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column) { @@ -296,6 +325,39 @@ Chunk StorageS3Source::generate() return generate(); } +static bool checkIfObjectExists(const std::shared_ptr & client, const String & bucket, const String & key) +{ + bool is_finished = false; + Aws::S3::Model::ListObjectsV2Request request; + Aws::S3::Model::ListObjectsV2Outcome outcome; + + request.SetBucket(bucket); + request.SetPrefix(key); + while (!is_finished) + { + outcome = client->ListObjectsV2(request); + if (!outcome.IsSuccess()) + throw Exception( + ErrorCodes::S3_ERROR, + "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", + quoteString(bucket), + quoteString(key), + backQuote(outcome.GetError().GetExceptionName()), + quoteString(outcome.GetError().GetMessage())); + + const auto & result_batch = outcome.GetResult().GetContents(); + for (const auto & obj : result_batch) + { + if (obj.GetKey() == key) + return true; + } + + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + is_finished = !outcome.GetResult().GetIsTruncated(); + } + + return false; +} class StorageS3Sink : public SinkToStorage { @@ -315,9 +377,6 @@ public: , sample_block(sample_block_) , format_settings(format_settings_) { - if (key.find_first_of("*?{") != std::string::npos) - throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "S3 key '{}' contains globs, so the table is in readonly mode", key); - write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3); writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings); @@ -419,7 +478,6 @@ private: std::optional format_settings; ExpressionActionsPtr partition_by_expr; - String partition_by_column_name; static void validateBucket(const String & str) { @@ -468,6 +526,7 @@ StorageS3::StorageS3( ASTPtr partition_by_) : IStorage(table_id_) , client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later + , keys({uri_.key}) , format_name(format_name_) , max_single_read_retries(max_single_read_retries_) , min_upload_part_size(min_upload_part_size_) @@ -477,6 +536,7 @@ StorageS3::StorageS3( , distributed_processing(distributed_processing_) , format_settings(format_settings_) , partition_by(partition_by_) + , is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos) { context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri); StorageInMemoryMetadata storage_metadata; @@ -484,7 +544,7 @@ StorageS3::StorageS3( updateClientAndAuthSettings(context_, client_auth); if (columns_.empty()) { - auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, format_settings, context_); + auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, is_key_with_globs, format_settings, context_); storage_metadata.setColumns(columns); } else @@ -495,9 +555,8 @@ StorageS3::StorageS3( setInMemoryMetadata(storage_metadata); } -std::shared_ptr StorageS3::createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context) +std::shared_ptr StorageS3::createFileIterator(const ClientAuthentication & client_auth, const std::vector & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context) { - std::shared_ptr iterator_wrapper{nullptr}; if (distributed_processing) { return std::make_shared( @@ -505,13 +564,23 @@ std::shared_ptr StorageS3::createFileIterator( return callback(); }); } - - /// Iterate through disclosed globs and make a source for each file - auto glob_iterator = std::make_shared(*client_auth.client, client_auth.uri); - return std::make_shared([glob_iterator]() + else if (is_key_with_globs) { - return glob_iterator->next(); - }); + /// Iterate through disclosed globs and make a source for each file + auto glob_iterator = std::make_shared(*client_auth.client, client_auth.uri); + return std::make_shared([glob_iterator]() + { + return glob_iterator->next(); + }); + } + else + { + auto keys_iterator = std::make_shared(keys); + return std::make_shared([keys_iterator]() + { + return keys_iterator->next(); + }); + } } Pipe StorageS3::read( @@ -536,7 +605,7 @@ Pipe StorageS3::read( need_file_column = true; } - std::shared_ptr iterator_wrapper = createFileIterator(client_auth, distributed_processing, local_context); + std::shared_ptr iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context); for (size_t i = 0; i < num_streams; ++i) { @@ -567,8 +636,8 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr updateClientAndAuthSettings(local_context, client_auth); auto sample_block = metadata_snapshot->getSampleBlock(); - auto chosen_compression_method = chooseCompressionMethod(client_auth.uri.key, compression_method); - bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || client_auth.uri.key.find(PARTITION_ID_WILDCARD) != String::npos; + auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method); + bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; auto insert_query = std::dynamic_pointer_cast(query); auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; @@ -585,12 +654,41 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr chosen_compression_method, client_auth.client, client_auth.uri.bucket, - client_auth.uri.key, + keys.back(), min_upload_part_size, max_single_part_upload_size); } else { + if (is_key_with_globs) + throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key); + + bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert; + + if (!truncate_in_insert && checkIfObjectExists(client_auth.client, client_auth.uri.bucket, keys.back())) + { + if (local_context->getSettingsRef().s3_create_new_file_on_insert) + { + size_t index = keys.size(); + auto pos = keys[0].find_first_of('.'); + String new_key; + do + { + new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos)); + ++index; + } + while (checkIfObjectExists(client_auth.client, client_auth.uri.bucket, new_key)); + keys.push_back(new_key); + } + else + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Object in bucket {} with key {} already exists. If you want to overwrite it, enable setting s3_truncate_on_insert, if you " + "want to create a new file on each insert, enable setting s3_create_new_file_on_insert", + client_auth.uri.bucket, + keys.back()); + } + return std::make_shared( format_name, sample_block, @@ -599,7 +697,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr chosen_compression_method, client_auth.client, client_auth.uri.bucket, - client_auth.uri.key, + keys.back(), min_upload_part_size, max_single_part_upload_size); } @@ -610,11 +708,17 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, { updateClientAndAuthSettings(local_context, client_auth); - Aws::S3::Model::ObjectIdentifier obj; - obj.SetKey(client_auth.uri.key); + if (is_key_with_globs) + throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key); Aws::S3::Model::Delete delkeys; - delkeys.AddObjects(std::move(obj)); + + for (const auto & key : keys) + { + Aws::S3::Model::ObjectIdentifier obj; + obj.SetKey(key); + delkeys.AddObjects(std::move(obj)); + } Aws::S3::Model::DeleteObjectsRequest request; request.SetBucket(client_auth.uri.bucket); @@ -734,7 +838,7 @@ ColumnsDescription StorageS3::getTableStructureFromData( { ClientAuthentication client_auth{uri, access_key_id, secret_access_key, max_connections, {}, {}}; updateClientAndAuthSettings(ctx, client_auth); - return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, format_settings, ctx); + return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx); } ColumnsDescription StorageS3::getTableStructureFromDataImpl( @@ -743,12 +847,14 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( UInt64 max_single_read_retries, const String & compression_method, bool distributed_processing, + bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx) { + std::vector keys = {client_auth.uri.key}; auto read_buffer_creator = [&]() { - auto file_iterator = createFileIterator(client_auth, distributed_processing, ctx); + auto file_iterator = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, ctx); String current_key = (*file_iterator)(); if (current_key.empty()) throw Exception( diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 0690040915d..1b766676427 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -44,6 +44,18 @@ public: std::shared_ptr pimpl; }; + class KeysIterator + { + public: + explicit KeysIterator(const std::vector & keys_); + String next(); + + private: + class Impl; + /// shared_ptr to have copy constructor + std::shared_ptr pimpl; + }; + using IteratorWrapper = std::function; static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column); @@ -174,6 +186,7 @@ private: }; ClientAuthentication client_auth; + std::vector keys; String format_name; UInt64 max_single_read_retries; @@ -184,10 +197,11 @@ private: const bool distributed_processing; std::optional format_settings; ASTPtr partition_by; + bool is_key_with_globs = false; static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &); - static std::shared_ptr createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context); + static std::shared_ptr createFileIterator(const ClientAuthentication & client_auth, const std::vector & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context); static ColumnsDescription getTableStructureFromDataImpl( const String & format, @@ -195,6 +209,7 @@ private: UInt64 max_single_read_retries, const String & compression_method, bool distributed_processing, + bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx); }; diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 3c7104749a9..b0836a38c9e 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -366,6 +366,43 @@ def test_hdfs_directory_not_exist(started_cluster): node1.query(ddl) assert "" == node1.query("select * from HDFSStorageWithNotExistDir") +def test_overwrite(started_cluster): + hdfs_api = started_cluster.hdfs_api + + table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')" + node1.query(f"create table test_overwrite as {table_function}") + node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(5)") + node1.query_and_get_error(f"insert into test_overwrite select number, randomString(100) FROM numbers(10)") + node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1") + + result = node1.query(f"select count() from test_overwrite") + assert(int(result) == 10) + + +def test_multiple_inserts(started_cluster): + hdfs_api = started_cluster.hdfs_api + + table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')" + node1.query(f"create table test_multiple_inserts as {table_function}") + node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)") + node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1") + node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1") + + result = node1.query(f"select count() from test_multiple_inserts") + assert(int(result) == 60) + + result = node1.query(f"drop table test_multiple_inserts") + + table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')" + node1.query(f"create table test_multiple_inserts as {table_function}") + node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)") + node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1") + node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1") + + result = node1.query(f"select count() from test_multiple_inserts") + assert(int(result) == 60) + + def test_format_detection(started_cluster): node1.query(f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')") node1.query(f"insert into arrow_table select 1") diff --git a/tests/integration/test_storage_s3/configs/named_collections.xml b/tests/integration/test_storage_s3/configs/named_collections.xml index ef21ced4d0c..f22440d17c9 100644 --- a/tests/integration/test_storage_s3/configs/named_collections.xml +++ b/tests/integration/test_storage_s3/configs/named_collections.xml @@ -10,6 +10,11 @@ minio minio123 + + http://minio1:9001/root/test_parquet_gz + minio + minio123 + http://minio1:9001/root/test_orc minio diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 0584ccf79b0..a804053d4fd 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -136,7 +136,7 @@ def test_put(started_cluster, maybe_auth, positive, compression): values_csv = "1,2,3\n3,2,1\n78,43,45\n" filename = "test.csv" put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}', - {maybe_auth}'CSV', '{table_format}', {compression}) values {values}""" + {maybe_auth}'CSV', '{table_format}', {compression}) values settings s3_truncate_on_insert=1 {values}""" try: run_query(instance, put_query) @@ -298,7 +298,7 @@ def test_put_csv(started_cluster, maybe_auth, positive): instance = started_cluster.instances["dummy"] # type: ClickHouseInstance table_format = "column1 UInt32, column2 UInt32, column3 UInt32" filename = "test.csv" - put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format( + put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV settings s3_truncate_on_insert=1".format( started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format) csv_data = "8,9,16\n11,18,13\n22,14,2\n" @@ -322,7 +322,7 @@ def test_put_get_with_redirect(started_cluster): values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)" values_csv = "1,1,1\n1,1,1\n11,11,11\n" filename = "test.csv" - query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( + query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format( started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values) run_query(instance, query) @@ -350,12 +350,12 @@ def test_put_with_zero_redirect(started_cluster): filename = "test.csv" # Should work without redirect - query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( + query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format( started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values) run_query(instance, query) # Should not work with redirect - query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( + query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format( started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values) exception_raised = False try: @@ -805,13 +805,13 @@ def test_seekable_formats(started_cluster): instance = started_cluster.instances["dummy"] # type: ClickHouseInstance table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')" - instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)") + instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1") result = instance.query(f"SELECT count() FROM {table_function}") assert(int(result) == 5000000) table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" - exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)") + exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1") result = instance.query(f"SELECT count() FROM {table_function}") assert(int(result) == 5000000) @@ -827,14 +827,14 @@ def test_seekable_formats_url(started_cluster): instance = started_cluster.instances["dummy"] table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')" - instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)") + instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1") table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')" result = instance.query(f"SELECT count() FROM {table_function}") assert(int(result) == 5000000) table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')" - exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)") + exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1") table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')" result = instance.query(f"SELECT count() FROM {table_function}") @@ -917,6 +917,48 @@ def test_empty_file(started_cluster): assert(int(result) == 0) +def test_overwrite(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + + table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')" + instance.query(f"create table test_overwrite as {table_function}") + instance.query(f"truncate table test_overwrite") + instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(50) settings s3_truncate_on_insert=1") + instance.query_and_get_error(f"insert into test_overwrite select number, randomString(100) from numbers(100)") + instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(200) settings s3_truncate_on_insert=1") + + result = instance.query(f"select count() from test_overwrite") + assert(int(result) == 200) + + +def test_create_new_files_on_insert(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + + table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')" + instance.query(f"create table test_multiple_inserts as {table_function}") + instance.query(f"truncate table test_multiple_inserts") + instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1") + instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1") + instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1") + + result = instance.query(f"select count() from test_multiple_inserts") + assert(int(result) == 60) + + instance.query(f"drop table test_multiple_inserts") + + table_function = f"s3(s3_parquet_gz, structure='a Int32, b String', format='Parquet')" + instance.query(f"create table test_multiple_inserts as {table_function}") + instance.query(f"truncate table test_multiple_inserts") + instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1") + instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1") + instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1") + + result = instance.query(f"select count() from test_multiple_inserts") + assert(int(result) == 60) + + def test_format_detection(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] diff --git a/tests/queries/0_stateless/02155_multiple_inserts_for_formats_with_suffix.reference b/tests/queries/0_stateless/02155_multiple_inserts_for_formats_with_suffix.reference new file mode 100644 index 00000000000..beeb89a5947 --- /dev/null +++ b/tests/queries/0_stateless/02155_multiple_inserts_for_formats_with_suffix.reference @@ -0,0 +1,100 @@ +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 diff --git a/tests/queries/0_stateless/02155_multiple_inserts_for_formats_with_suffix.sql b/tests/queries/0_stateless/02155_multiple_inserts_for_formats_with_suffix.sql new file mode 100644 index 00000000000..7947536bc79 --- /dev/null +++ b/tests/queries/0_stateless/02155_multiple_inserts_for_formats_with_suffix.sql @@ -0,0 +1,39 @@ +-- Tags: no-fasttest, no-parallel + +drop table if exists test; +create table test (number UInt64) engine=File('Parquet'); +insert into test select * from numbers(10); +insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1; +select * from test order by number; +truncate table test; +drop table test; + +create table test (number UInt64) engine=File('Parquet', 'test_02155/test1/data.Parquet'); +insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1; +insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1; +select * from test order by number; +drop table test; + + +insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1; +insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1; +select * from file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64'); +select * from file(concat(currentDatabase(), '/test2/data.1.Parquet'), 'Parquet', 'number UInt64'); + +create table test (number UInt64) engine=File('Parquet', 'test_02155/test3/data.Parquet.gz'); +insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1; +; +insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1; +select * from test order by number; +drop table test; + +insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1; +insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1; +select * from file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64'); +select * from file(concat(currentDatabase(), '/test4/data.1.Parquet.gz'), 'Parquet', 'number UInt64'); + diff --git a/tests/queries/0_stateless/02168_avro_bug.sql b/tests/queries/0_stateless/02168_avro_bug.sql index 78eedf3258e..ac98119845f 100644 --- a/tests/queries/0_stateless/02168_avro_bug.sql +++ b/tests/queries/0_stateless/02168_avro_bug.sql @@ -1,5 +1,5 @@ --- Tags: no-fasttest -insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10); -insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10); -insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10); +-- Tags: no-fasttest, no-parallel +insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); +insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE } +insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE } select 'OK';