Allow to create new files on insert for File/S3/HDFS engines

This commit is contained in:
avogar 2021-12-29 21:03:15 +03:00
parent 489a30859f
commit 97788b9c21
22 changed files with 592 additions and 65 deletions

View File

@ -605,6 +605,7 @@
M(634, MONGODB_ERROR) \
M(635, CANNOT_POLL) \
M(636, CANNOT_EXTRACT_TABLE_STRUCTURE) \
M(637, CANNOT_APPEND_TO_FILE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -75,7 +75,11 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, s3_truncate_on_insert, false, "", 0) \
M(Bool, s3_create_new_file_on_insert, false, "", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "", 0) \
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
@ -490,6 +494,7 @@ class IColumn;
\
M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, engine_file_allow_create_multiple_files, false, ".", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \

View File

@ -383,6 +383,26 @@ void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name
target = std::move(non_trivial_prefix_and_suffix_checker);
}
void FormatFactory::registerSuffixChecker(const String & name, SuffixChecker suffix_checker)
{
auto & target = dict[name].suffix_checker;
if (target)
throw Exception("FormatFactory: Suffix checker " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(suffix_checker);
}
void FormatFactory::markFormatWithSuffix(const String & name)
{
registerSuffixChecker(name, [](const FormatSettings &){ return true; });
}
bool FormatFactory::checkIfFormatHasSuffix(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_)
{
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
auto & suffix_checker = dict[name].suffix_checker;
return suffix_checker && suffix_checker(format_settings);
}
void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
{
auto & target = dict[name].output_creator;

View File

@ -92,6 +92,10 @@ private:
/// The checker should return true if parallel parsing should be disabled.
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
/// Some formats can have suffix after data depending on settings.
/// The checker should return true if format will write some suffix after data.
using SuffixChecker = std::function<bool(const FormatSettings & settings)>;
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings, ContextPtr context)>;
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
@ -105,6 +109,7 @@ private:
bool supports_parallel_formatting{false};
bool is_column_oriented{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
SuffixChecker suffix_checker;
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -165,6 +170,14 @@ public:
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerSuffixChecker(const String & name, SuffixChecker suffix_checker);
/// If format always contains suffix, you an use this method instead of
/// registerSuffixChecker with suffix_checker that always returns true.
void markFormatWithSuffix(const String & name);
bool checkIfFormatHasSuffix(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);

View File

@ -93,6 +93,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
});
factory.markFormatWithSuffix("Arrow");
factory.registerOutputFormat(
"ArrowStream",
@ -103,6 +104,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
});
factory.markFormatWithSuffix("ArrowStream");
}
}

View File

@ -91,6 +91,11 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
factory.registerSuffixChecker(format_name, [](const FormatSettings & settings)
{
return !settings.custom.result_after_delimiter.empty();
});
};
registerWithNamesAndTypes("CustomSeparated", register_func);

View File

@ -284,6 +284,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
factory.markFormatWithSuffix("JSON");
factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf,
@ -295,6 +296,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
factory.markFormatWithSuffix("JSONStrings");
}
}

View File

@ -526,6 +526,7 @@ void registerOutputFormatORC(FormatFactory & factory)
{
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
});
factory.markFormatWithSuffix("ORC");
}
}

View File

@ -85,6 +85,7 @@ void registerOutputFormatParquet(FormatFactory & factory)
{
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
});
factory.markFormatWithSuffix("Parquet");
}
}

View File

@ -235,5 +235,19 @@ void registerOutputFormatTemplate(FormatFactory & factory)
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
});
factory.registerSuffixChecker("Template", [](const FormatSettings & settings)
{
if (settings.template_settings.resultset_format.empty())
return false;
auto resultset_format = ParsedTemplateFormatString(
FormatSchemaInfo(settings.template_settings.resultset_format, "Template", false,
settings.schema.is_server, settings.schema.format_schema_path),
[&](const String & partName)
{
return static_cast<size_t>(TemplateBlockOutputFormat::stringToResultsetPart(partName));
});
return !resultset_format.delimiters.empty() && !resultset_format.delimiters.back().empty();
});
}
}

View File

@ -256,6 +256,7 @@ void registerOutputFormatXML(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting("XML");
factory.markFormatWithSuffix("XML");
}
}

View File

@ -14,9 +14,8 @@
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
@ -28,7 +27,6 @@
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/PartitionedSink.h>
#include <Formats/ReadSchemaUtils.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionsConversion.h>
@ -52,6 +50,7 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ACCESS_DENIED;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
namespace
@ -130,20 +129,23 @@ StorageHDFS::StorageHDFS(
ASTPtr partition_by_)
: IStorage(table_id_)
, WithContext(context_)
, uri(uri_)
, uris({uri_})
, format_name(format_name_)
, compression_method(compression_method_)
, distributed_processing(distributed_processing_)
, partition_by(partition_by_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
checkHDFSURL(uri);
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
checkHDFSURL(uri_);
String path = uri_.substr(uri_.find('/', uri_.find("//") + 2));
is_path_with_globs = path.find_first_of("*?{") != std::string::npos;
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = getTableStructureFromData(format_name, uri, compression_method, context_);
auto columns = getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
}
else
@ -208,6 +210,25 @@ private:
Strings::iterator uris_iter;
};
class HDFSSource::URISIterator::Impl
{
public:
Impl(const std::vector<const String> & uris_) : uris(uris_), index(0)
{
}
String next()
{
if (index == uris.size())
return "";
return uris[index++];
}
private:
const std::vector<const String> & uris;
size_t index;
};
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
{
auto header = metadata_snapshot->getSampleBlock();
@ -241,6 +262,15 @@ String HDFSSource::DisclosedGlobIterator::next()
return pimpl->next();
}
HDFSSource::URISIterator::URISIterator(const std::vector<const String> & uris_)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_))
{
}
String HDFSSource::URISIterator::next()
{
return pimpl->next();
}
HDFSSource::HDFSSource(
StorageHDFSPtr storage_,
@ -275,9 +305,8 @@ bool HDFSSource::initialize()
current_path = (*file_iterator)();
if (current_path.empty())
return false;
const size_t begin_of_path = current_path.find('/', current_path.find("//") + 2);
const String path_from_uri = current_path.substr(begin_of_path);
const String uri_without_path = current_path.substr(0, begin_of_path);
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
@ -460,15 +489,23 @@ Pipe StorageHDFS::read(
return callback();
});
}
else
else if (is_path_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uri);
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uris[0]);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();
});
}
Pipes pipes;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
@ -496,9 +533,44 @@ Pipe StorageHDFS::read(
return Pipe::unitePipes(std::move(pipes));
}
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_)
{
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
if (is_path_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
String current_uri = uris.back();
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_uri);
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
bool truncate_on_insert = context_->getSettingsRef().hdfs_truncate_on_insert;
if (!truncate_on_insert && !hdfsExists(fs.get(), path_from_uri.c_str()))
{
if (context_->getSettingsRef().hdfs_create_new_file_on_insert)
{
auto pos = uris[0].find_first_of('.', uris[0].find_last_of('/'));
size_t index = uris.size();
String new_uri;
do
{
new_uri = uris[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : uris[0].substr(pos));
++index;
}
while (!hdfsExists(fs.get(), new_uri.c_str()));
uris.push_back(new_uri);
current_uri = new_uri;
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
path_from_uri);
}
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
@ -507,34 +579,37 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
{
return std::make_shared<PartitionedHDFSSink>(
partition_by_ast,
uri,
current_uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
context_,
chooseCompressionMethod(current_uri, compression_method));
}
else
{
return std::make_shared<HDFSSink>(uri,
return std::make_shared<HDFSSink>(current_uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
context_,
chooseCompressionMethod(current_uri, compression_method));
}
}
void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
const String path = uri.substr(begin_of_path);
const String url = uri.substr(0, begin_of_path);
const size_t begin_of_path = uris[0].find('/', uris[0].find("//") + 2);
const String url = uris[0].substr(0, begin_of_path);
HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
int ret = hdfsDelete(fs.get(), path.data(), 0);
if (ret)
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError()));
for (const auto & uri : uris)
{
const String path = uri.substr(begin_of_path);
int ret = hdfsDelete(fs.get(), path.data(), 0);
if (ret)
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError()));
}
}

View File

@ -31,7 +31,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
void truncate(
const ASTPtr & query,
@ -70,11 +70,12 @@ protected:
ASTPtr partition_by = nullptr);
private:
const String uri;
std::vector<const String> uris;
String format_name;
String compression_method;
const bool distributed_processing;
ASTPtr partition_by;
bool is_path_with_globs;
Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
};
@ -95,6 +96,17 @@ public:
std::shared_ptr<Impl> pimpl;
};
class URISIterator
{
public:
URISIterator(const std::vector<const String> & uris_);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>;
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;

View File

@ -38,11 +38,14 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const String path = hdfs_uri.substr(begin_of_path);
if (path.find_first_of("*?{") != std::string::npos)
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri);
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path);
// if (path.find_first_of("*?{") != std::string::npos)
// throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri);
//
// if (!hdfsExists(fs.get(), path.c_str()) && !truncate_)
// throw Exception(
// ErrorCodes::BAD_ARGUMENTS,
// "File {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert",
// path);
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here

View File

@ -65,6 +65,7 @@ namespace ErrorCodes
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_STAT;
extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
@ -285,6 +286,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
{
is_db_table = false;
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
is_path_with_globs = paths.size() > 1;
path_for_partitioned_write = table_path_;
setStorageMetadata(args);
}
@ -666,10 +668,9 @@ public:
}
else
{
if (paths.size() != 1)
throw Exception("Table '" + table_name_for_log + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
assert(!paths.empty());
flags |= O_WRONLY | O_APPEND | O_CREAT;
naked_buffer = std::make_unique<WriteBufferFromFile>(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags);
naked_buffer = std::make_unique<WriteBufferFromFile>(paths.back(), DBMS_DEFAULT_BUFFER_SIZE, flags);
}
/// In case of formats with prefixes if file is not empty we have already written prefix.
@ -827,6 +828,35 @@ SinkToStoragePtr StorageFile::write(
{
path = paths[0];
fs::create_directories(fs::path(path).parent_path());
if (is_path_with_globs)
throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
&& FormatFactory::instance().checkIfFormatHasSuffix(format_name, context, format_settings) && fs::exists(paths.back())
&& fs::file_size(paths.back()) != 0)
{
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
{
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
size_t index = paths.size();
String new_path;
do
{
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
++index;
}
while (fs::exists(new_path));
paths.push_back(new_path);
}
else
throw Exception(
ErrorCodes::CANNOT_APPEND_TO_FILE,
"Cannot append data in format {} to file, because this format contains suffix and "
"data can be written to a file only once. You can allow to create a new file "
"on each insert by enabling setting engine_file_allow_create_multiple_files",
format_name);
}
}
return std::make_shared<StorageFileSink>(
@ -882,7 +912,7 @@ void StorageFile::truncate(
ContextPtr /* context */,
TableExclusiveLockHolder &)
{
if (paths.size() != 1)
if (is_path_with_globs)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
if (use_table_fd)
@ -892,11 +922,14 @@ void StorageFile::truncate(
}
else
{
if (!fs::exists(paths[0]))
return;
for (const auto & path : paths)
{
if (!fs::exists(path))
continue;
if (0 != ::truncate(paths[0].c_str(), 0))
throwFromErrnoWithPath("Cannot truncate file " + paths[0], paths[0], ErrorCodes::CANNOT_TRUNCATE_FILE);
if (0 != ::truncate(path.c_str(), 0))
throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
}
}
}

View File

@ -120,6 +120,8 @@ private:
size_t total_bytes_to_read = 0;
String path_for_partitioned_write;
bool is_path_with_globs = false;
};
}

View File

@ -68,7 +68,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int S3_ERROR;
extern const int UNEXPECTED_EXPRESSION;
extern const int CANNOT_OPEN_FILE;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
@ -176,6 +176,33 @@ String StorageS3Source::DisclosedGlobIterator::next()
return pimpl->next();
}
class StorageS3Source::KeysIterator::Impl
{
public:
explicit Impl(const std::vector<String> & keys_) : keys(keys_), index(0)
{
}
String next()
{
if (index == keys.size())
return "";
return keys[index++];
}
private:
const std::vector<String> & keys;
size_t index;
};
StorageS3Source::KeysIterator::KeysIterator(const std::vector<String> & keys_) : pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_))
{
}
String StorageS3Source::KeysIterator::next()
{
return pimpl->next();
}
Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column)
{
@ -296,6 +323,39 @@ Chunk StorageS3Source::generate()
return generate();
}
static bool checkIfObjectExists(const std::shared_ptr<Aws::S3::S3Client> & client, const String & bucket, const String & key)
{
bool is_finished = false;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
request.SetBucket(bucket);
request.SetPrefix(key);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(key),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
if (obj.GetKey() == key)
return true;
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return false;
}
class StorageS3Sink : public SinkToStorage
{
@ -315,9 +375,6 @@ public:
, sample_block(sample_block_)
, format_settings(format_settings_)
{
if (key.find_first_of("*?{") != std::string::npos)
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "S3 key '{}' contains globs, so the table is in readonly mode", key);
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
@ -419,7 +476,6 @@ private:
std::optional<FormatSettings> format_settings;
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
static void validateBucket(const String & str)
{
@ -468,6 +524,7 @@ StorageS3::StorageS3(
ASTPtr partition_by_)
: IStorage(table_id_)
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, keys({uri_.key})
, format_name(format_name_)
, max_single_read_retries(max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
@ -477,6 +534,7 @@ StorageS3::StorageS3(
, distributed_processing(distributed_processing_)
, format_settings(format_settings_)
, partition_by(partition_by_)
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata;
@ -484,7 +542,7 @@ StorageS3::StorageS3(
updateClientAndAuthSettings(context_, client_auth);
if (columns_.empty())
{
auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, format_settings, context_);
auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, is_key_with_globs, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
@ -495,7 +553,7 @@ StorageS3::StorageS3(
setInMemoryMetadata(storage_metadata);
}
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context)
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context)
{
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper{nullptr};
if (distributed_processing)
@ -505,13 +563,23 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
return callback();
});
}
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
else if (is_key_with_globs)
{
return glob_iterator->next();
});
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
iterator_wrapper = std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto keys_iterator = std::make_shared<StorageS3Source::KeysIterator>(keys);
iterator_wrapper = std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]()
{
return keys_iterator->next();
});
}
}
Pipe StorageS3::read(
@ -536,7 +604,7 @@ Pipe StorageS3::read(
need_file_column = true;
}
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, distributed_processing, local_context);
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
for (size_t i = 0; i < num_streams; ++i)
{
@ -566,9 +634,38 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
{
updateClientAndAuthSettings(local_context, client_auth);
if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
if (!truncate_in_insert && checkIfObjectExists(client_auth.client, client_auth.uri.bucket, keys.back()))
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{
size_t index = keys.size();
auto pos = keys[0].find_first_of('.');
String new_key;
do
{
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
++index;
}
while (checkIfObjectExists(client_auth.client, client_auth.uri.bucket, new_key));
keys.push_back(new_key);
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
client_auth.uri.bucket,
keys.back());
}
auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(client_auth.uri.key, compression_method);
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || client_auth.uri.key.find(PARTITION_ID_WILDCARD) != String::npos;
auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
@ -585,7 +682,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
chosen_compression_method,
client_auth.client,
client_auth.uri.bucket,
client_auth.uri.key,
keys.back(),
min_upload_part_size,
max_single_part_upload_size);
}
@ -599,7 +696,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
chosen_compression_method,
client_auth.client,
client_auth.uri.bucket,
client_auth.uri.key,
keys.back(),
min_upload_part_size,
max_single_part_upload_size);
}
@ -610,11 +707,17 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
{
updateClientAndAuthSettings(local_context, client_auth);
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(client_auth.uri.key);
if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
Aws::S3::Model::Delete delkeys;
delkeys.AddObjects(std::move(obj));
for (const auto & key : keys)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(key);
delkeys.AddObjects(std::move(obj));
}
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(client_auth.uri.bucket);
@ -731,7 +834,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
{
ClientAuthentication client_auth{uri, access_key_id, secret_access_key, max_connections, {}, {}};
updateClientAndAuthSettings(ctx, client_auth);
return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, format_settings, ctx);
return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx);
}
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -740,12 +843,13 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
UInt64 max_single_read_retries,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx)
{
auto read_buffer_creator = [&]()
{
auto file_iterator = createFileIterator(client_auth, distributed_processing, ctx);
auto file_iterator = createFileIterator(client_auth, {client_auth.uri.key}, is_key_with_globs, distributed_processing, ctx);
String current_key = (*file_iterator)();
if (current_key.empty())
throw Exception(

View File

@ -44,6 +44,18 @@ public:
std::shared_ptr<Impl> pimpl;
};
class KeysIterator
{
public:
explicit KeysIterator(const std::vector<String> & keys_);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>;
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
@ -174,6 +186,7 @@ private:
};
ClientAuthentication client_auth;
std::vector<String> keys;
String format_name;
UInt64 max_single_read_retries;
@ -184,10 +197,11 @@ private:
const bool distributed_processing;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
bool is_key_with_globs = false;
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context);
static ColumnsDescription getTableStructureFromDataImpl(
const String & format,
@ -195,6 +209,7 @@ private:
UInt64 max_single_read_retries,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
};

View File

@ -362,6 +362,43 @@ def test_hdfsCluster(started_cluster):
fs.delete(dir, recursive=True)
def test_overwrite(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test as {table_function}")
node1.query(f"insert into test select number, randomString(100) from numbers(5)")
node1.query_and_get_error(f"insert into test select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1")
result = node1.query(f"select count() from test")
assert(int(result) == 10)
def test_multiple_inserts(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1")
result = node1.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
result = node1.query(f"drop table test_multiple_inserts")
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1")
result = node1.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -915,3 +915,45 @@ def test_empty_file(started_cluster):
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 0)
def test_overwrite(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_overwrite as {table_function}")
instance.query(f"truncate table test_overwrite")
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(50) settings s3_truncate_on_insert=1")
instance.query_and_get_error(f"insert into test_overwrite select number, randomString(100) from numbers(100)")
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(200) settings s3_truncate_on_insert=1")
result = instance.query(f"select count() from test_overwrite")
assert(int(result) == 200)
def test_create_new_files_on_insert(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
instance.query(f"drop table test_multiple_inserts")
table_function = f"s3(s3_parquet_gz, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)

View File

@ -0,0 +1,100 @@
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

View File

@ -0,0 +1,39 @@
-- Tags: no-fasttest, no-parallel
drop table if exists test;
create table test (number UInt64) engine=File('Parquet');
insert into test select * from numbers(10);
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
truncate table test;
drop table test;
create table test (number UInt64) engine=File('Parquet', 'test_02155/test1/data.Parquet');
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
drop table test;
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64');
select * from file(concat(currentDatabase(), '/test2/data.1.Parquet'), 'Parquet', 'number UInt64');
create table test (number UInt64) engine=File('Parquet', 'test_02155/test3/data.Parquet.gz');
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
;
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
drop table test;
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64');
select * from file(concat(currentDatabase(), '/test4/data.1.Parquet.gz'), 'Parquet', 'number UInt64');