mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #33302 from Avogar/formats-with-suffixes
Allow to create new files on insert for File/S3/HDFS engines
This commit is contained in:
commit
9f12f4af13
@ -609,6 +609,7 @@
|
|||||||
M(638, SNAPPY_UNCOMPRESS_FAILED) \
|
M(638, SNAPPY_UNCOMPRESS_FAILED) \
|
||||||
M(639, SNAPPY_COMPRESS_FAILED) \
|
M(639, SNAPPY_COMPRESS_FAILED) \
|
||||||
M(640, NO_HIVEMETASTORE) \
|
M(640, NO_HIVEMETASTORE) \
|
||||||
|
M(641, CANNOT_APPEND_TO_FILE) \
|
||||||
\
|
\
|
||||||
M(999, KEEPER_EXCEPTION) \
|
M(999, KEEPER_EXCEPTION) \
|
||||||
M(1000, POCO_EXCEPTION) \
|
M(1000, POCO_EXCEPTION) \
|
||||||
|
@ -75,7 +75,11 @@ class IColumn;
|
|||||||
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
|
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
|
||||||
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
|
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
|
||||||
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
|
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
|
||||||
|
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
|
||||||
|
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
|
||||||
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
|
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
|
||||||
|
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
|
||||||
|
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \
|
||||||
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
|
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
|
||||||
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
|
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
|
||||||
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
|
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
|
||||||
@ -490,6 +494,7 @@ class IColumn;
|
|||||||
\
|
\
|
||||||
M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \
|
M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \
|
||||||
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
|
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
|
||||||
|
M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \
|
||||||
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
|
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
|
||||||
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
|
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
|
||||||
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \
|
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \
|
||||||
|
@ -394,6 +394,27 @@ void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name
|
|||||||
target = std::move(non_trivial_prefix_and_suffix_checker);
|
target = std::move(non_trivial_prefix_and_suffix_checker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FormatFactory::registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker)
|
||||||
|
{
|
||||||
|
auto & target = dict[name].append_support_checker;
|
||||||
|
if (target)
|
||||||
|
throw Exception("FormatFactory: Suffix checker " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
target = std::move(append_support_checker);
|
||||||
|
}
|
||||||
|
|
||||||
|
void FormatFactory::markFormatHasNoAppendSupport(const String & name)
|
||||||
|
{
|
||||||
|
registerAppendSupportChecker(name, [](const FormatSettings &){ return false; });
|
||||||
|
}
|
||||||
|
|
||||||
|
bool FormatFactory::checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_)
|
||||||
|
{
|
||||||
|
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
|
||||||
|
auto & append_support_checker = dict[name].append_support_checker;
|
||||||
|
/// By default we consider that format supports append
|
||||||
|
return !append_support_checker || append_support_checker(format_settings);
|
||||||
|
}
|
||||||
|
|
||||||
void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
|
void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
|
||||||
{
|
{
|
||||||
auto & target = dict[name].output_creator;
|
auto & target = dict[name].output_creator;
|
||||||
|
@ -93,6 +93,10 @@ private:
|
|||||||
/// The checker should return true if parallel parsing should be disabled.
|
/// The checker should return true if parallel parsing should be disabled.
|
||||||
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
|
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
|
||||||
|
|
||||||
|
/// Some formats can support append depending on settings.
|
||||||
|
/// The checker should return true if format support append.
|
||||||
|
using AppendSupportChecker = std::function<bool(const FormatSettings & settings)>;
|
||||||
|
|
||||||
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings, ContextPtr context)>;
|
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings, ContextPtr context)>;
|
||||||
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
|
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
|
||||||
|
|
||||||
@ -106,6 +110,7 @@ private:
|
|||||||
bool supports_parallel_formatting{false};
|
bool supports_parallel_formatting{false};
|
||||||
bool is_column_oriented{false};
|
bool is_column_oriented{false};
|
||||||
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
|
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
|
||||||
|
AppendSupportChecker append_support_checker;
|
||||||
};
|
};
|
||||||
|
|
||||||
using FormatsDictionary = std::unordered_map<String, Creators>;
|
using FormatsDictionary = std::unordered_map<String, Creators>;
|
||||||
@ -167,6 +172,14 @@ public:
|
|||||||
|
|
||||||
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
|
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
|
||||||
|
|
||||||
|
void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker);
|
||||||
|
|
||||||
|
/// If format always doesn't support append, you can use this method instead of
|
||||||
|
/// registerAppendSupportChecker with append_support_checker that always returns true.
|
||||||
|
void markFormatHasNoAppendSupport(const String & name);
|
||||||
|
|
||||||
|
bool checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
|
||||||
|
|
||||||
/// Register format by its name.
|
/// Register format by its name.
|
||||||
void registerInputFormat(const String & name, InputCreator input_creator);
|
void registerInputFormat(const String & name, InputCreator input_creator);
|
||||||
void registerOutputFormat(const String & name, OutputCreator output_creator);
|
void registerOutputFormat(const String & name, OutputCreator output_creator);
|
||||||
|
@ -93,6 +93,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
|
|||||||
{
|
{
|
||||||
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
|
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
|
||||||
});
|
});
|
||||||
|
factory.markFormatHasNoAppendSupport("Arrow");
|
||||||
|
|
||||||
factory.registerOutputFormat(
|
factory.registerOutputFormat(
|
||||||
"ArrowStream",
|
"ArrowStream",
|
||||||
@ -103,6 +104,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
|
|||||||
{
|
{
|
||||||
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
|
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
|
||||||
});
|
});
|
||||||
|
factory.markFormatHasNoAppendSupport("ArrowStream");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -479,6 +479,7 @@ void registerOutputFormatAvro(FormatFactory & factory)
|
|||||||
{
|
{
|
||||||
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
|
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
|
||||||
});
|
});
|
||||||
|
factory.markFormatHasNoAppendSupport("Avro");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -91,6 +91,11 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
|
|||||||
});
|
});
|
||||||
|
|
||||||
factory.markOutputFormatSupportsParallelFormatting(format_name);
|
factory.markOutputFormatSupportsParallelFormatting(format_name);
|
||||||
|
|
||||||
|
factory.registerAppendSupportChecker(format_name, [](const FormatSettings & settings)
|
||||||
|
{
|
||||||
|
return settings.custom.result_after_delimiter.empty();
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
registerWithNamesAndTypes("CustomSeparated", register_func);
|
registerWithNamesAndTypes("CustomSeparated", register_func);
|
||||||
|
@ -284,6 +284,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
|
|||||||
});
|
});
|
||||||
|
|
||||||
factory.markOutputFormatSupportsParallelFormatting("JSON");
|
factory.markOutputFormatSupportsParallelFormatting("JSON");
|
||||||
|
factory.markFormatHasNoAppendSupport("JSON");
|
||||||
|
|
||||||
factory.registerOutputFormat("JSONStrings", [](
|
factory.registerOutputFormat("JSONStrings", [](
|
||||||
WriteBuffer & buf,
|
WriteBuffer & buf,
|
||||||
@ -295,6 +296,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
|
|||||||
});
|
});
|
||||||
|
|
||||||
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
|
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
|
||||||
|
factory.markFormatHasNoAppendSupport("JSONStrings");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -526,6 +526,7 @@ void registerOutputFormatORC(FormatFactory & factory)
|
|||||||
{
|
{
|
||||||
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
|
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
|
||||||
});
|
});
|
||||||
|
factory.markFormatHasNoAppendSupport("ORC");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -85,6 +85,7 @@ void registerOutputFormatParquet(FormatFactory & factory)
|
|||||||
{
|
{
|
||||||
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
|
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
|
||||||
});
|
});
|
||||||
|
factory.markFormatHasNoAppendSupport("Parquet");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -235,5 +235,19 @@ void registerOutputFormatTemplate(FormatFactory & factory)
|
|||||||
|
|
||||||
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
|
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
factory.registerAppendSupportChecker("Template", [](const FormatSettings & settings)
|
||||||
|
{
|
||||||
|
if (settings.template_settings.resultset_format.empty())
|
||||||
|
return true;
|
||||||
|
auto resultset_format = ParsedTemplateFormatString(
|
||||||
|
FormatSchemaInfo(settings.template_settings.resultset_format, "Template", false,
|
||||||
|
settings.schema.is_server, settings.schema.format_schema_path),
|
||||||
|
[&](const String & partName)
|
||||||
|
{
|
||||||
|
return static_cast<size_t>(TemplateBlockOutputFormat::stringToResultsetPart(partName));
|
||||||
|
});
|
||||||
|
return resultset_format.delimiters.empty() || resultset_format.delimiters.back().empty();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -256,6 +256,7 @@ void registerOutputFormatXML(FormatFactory & factory)
|
|||||||
});
|
});
|
||||||
|
|
||||||
factory.markOutputFormatSupportsParallelFormatting("XML");
|
factory.markOutputFormatSupportsParallelFormatting("XML");
|
||||||
|
factory.markFormatHasNoAppendSupport("XML");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -14,9 +14,8 @@
|
|||||||
#include <Processors/Formats/IInputFormat.h>
|
#include <Processors/Formats/IInputFormat.h>
|
||||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||||
|
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
|
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include <Interpreters/evaluateConstantExpression.h>
|
#include <Interpreters/evaluateConstantExpression.h>
|
||||||
#include <Interpreters/ExpressionAnalyzer.h>
|
#include <Interpreters/ExpressionAnalyzer.h>
|
||||||
#include <Interpreters/TreeRewriter.h>
|
#include <Interpreters/TreeRewriter.h>
|
||||||
@ -28,7 +27,6 @@
|
|||||||
#include <Storages/HDFS/WriteBufferFromHDFS.h>
|
#include <Storages/HDFS/WriteBufferFromHDFS.h>
|
||||||
#include <Storages/PartitionedSink.h>
|
#include <Storages/PartitionedSink.h>
|
||||||
|
|
||||||
|
|
||||||
#include <Formats/ReadSchemaUtils.h>
|
#include <Formats/ReadSchemaUtils.h>
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
#include <Functions/FunctionsConversion.h>
|
#include <Functions/FunctionsConversion.h>
|
||||||
@ -52,7 +50,9 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int ACCESS_DENIED;
|
extern const int ACCESS_DENIED;
|
||||||
|
extern const int DATABASE_ACCESS_DENIED;
|
||||||
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
namespace
|
namespace
|
||||||
@ -139,20 +139,23 @@ StorageHDFS::StorageHDFS(
|
|||||||
ASTPtr partition_by_)
|
ASTPtr partition_by_)
|
||||||
: IStorage(table_id_)
|
: IStorage(table_id_)
|
||||||
, WithContext(context_)
|
, WithContext(context_)
|
||||||
, uri(uri_)
|
, uris({uri_})
|
||||||
, format_name(format_name_)
|
, format_name(format_name_)
|
||||||
, compression_method(compression_method_)
|
, compression_method(compression_method_)
|
||||||
, distributed_processing(distributed_processing_)
|
, distributed_processing(distributed_processing_)
|
||||||
, partition_by(partition_by_)
|
, partition_by(partition_by_)
|
||||||
{
|
{
|
||||||
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
|
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
|
||||||
checkHDFSURL(uri);
|
checkHDFSURL(uri_);
|
||||||
|
|
||||||
|
String path = uri_.substr(uri_.find('/', uri_.find("//") + 2));
|
||||||
|
is_path_with_globs = path.find_first_of("*?{") != std::string::npos;
|
||||||
|
|
||||||
StorageInMemoryMetadata storage_metadata;
|
StorageInMemoryMetadata storage_metadata;
|
||||||
|
|
||||||
if (columns_.empty())
|
if (columns_.empty())
|
||||||
{
|
{
|
||||||
auto columns = getTableStructureFromData(format_name, uri, compression_method, context_);
|
auto columns = getTableStructureFromData(format_name, uri_, compression_method, context_);
|
||||||
storage_metadata.setColumns(columns);
|
storage_metadata.setColumns(columns);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -217,6 +220,39 @@ private:
|
|||||||
Strings::iterator uris_iter;
|
Strings::iterator uris_iter;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class HDFSSource::URISIterator::Impl
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit Impl(const std::vector<const String> & uris_, ContextPtr context)
|
||||||
|
{
|
||||||
|
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]);
|
||||||
|
HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef());
|
||||||
|
HDFSFSPtr fs = createHDFSFS(builder.get());
|
||||||
|
for (const auto & uri : uris_)
|
||||||
|
{
|
||||||
|
path_and_uri = getPathFromUriAndUriWithoutPath(uri);
|
||||||
|
if (!hdfsExists(fs.get(), path_and_uri.first.c_str()))
|
||||||
|
uris.push_back(uri);
|
||||||
|
}
|
||||||
|
uris_iter = uris.begin();
|
||||||
|
}
|
||||||
|
|
||||||
|
String next()
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
if (uris_iter == uris.end())
|
||||||
|
return "";
|
||||||
|
auto key = *uris_iter;
|
||||||
|
++uris_iter;
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::mutex mutex;
|
||||||
|
Strings uris;
|
||||||
|
Strings::iterator uris_iter;
|
||||||
|
};
|
||||||
|
|
||||||
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
|
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
|
||||||
{
|
{
|
||||||
auto header = metadata_snapshot->getSampleBlock();
|
auto header = metadata_snapshot->getSampleBlock();
|
||||||
@ -250,6 +286,15 @@ String HDFSSource::DisclosedGlobIterator::next()
|
|||||||
return pimpl->next();
|
return pimpl->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HDFSSource::URISIterator::URISIterator(const std::vector<const String> & uris_, ContextPtr context)
|
||||||
|
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, context))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
String HDFSSource::URISIterator::next()
|
||||||
|
{
|
||||||
|
return pimpl->next();
|
||||||
|
}
|
||||||
|
|
||||||
HDFSSource::HDFSSource(
|
HDFSSource::HDFSSource(
|
||||||
StorageHDFSPtr storage_,
|
StorageHDFSPtr storage_,
|
||||||
@ -284,9 +329,8 @@ bool HDFSSource::initialize()
|
|||||||
current_path = (*file_iterator)();
|
current_path = (*file_iterator)();
|
||||||
if (current_path.empty())
|
if (current_path.empty())
|
||||||
return false;
|
return false;
|
||||||
const size_t begin_of_path = current_path.find('/', current_path.find("//") + 2);
|
|
||||||
const String path_from_uri = current_path.substr(begin_of_path);
|
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
|
||||||
const String uri_without_path = current_path.substr(0, begin_of_path);
|
|
||||||
|
|
||||||
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
|
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
|
||||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
|
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
|
||||||
@ -469,15 +513,23 @@ Pipe StorageHDFS::read(
|
|||||||
return callback();
|
return callback();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else
|
else if (is_path_with_globs)
|
||||||
{
|
{
|
||||||
/// Iterate through disclosed globs and make a source for each file
|
/// Iterate through disclosed globs and make a source for each file
|
||||||
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uri);
|
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uris[0]);
|
||||||
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
|
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
|
||||||
{
|
{
|
||||||
return glob_iterator->next();
|
return glob_iterator->next();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, context_);
|
||||||
|
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
|
||||||
|
{
|
||||||
|
return uris_iterator->next();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
|
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
|
||||||
@ -505,9 +557,11 @@ Pipe StorageHDFS::read(
|
|||||||
return Pipe::unitePipes(std::move(pipes));
|
return Pipe::unitePipes(std::move(pipes));
|
||||||
}
|
}
|
||||||
|
|
||||||
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
|
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_)
|
||||||
{
|
{
|
||||||
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
|
String current_uri = uris.back();
|
||||||
|
|
||||||
|
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
|
||||||
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
|
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
|
||||||
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
|
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
|
||||||
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
|
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
|
||||||
@ -516,34 +570,70 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
|
|||||||
{
|
{
|
||||||
return std::make_shared<PartitionedHDFSSink>(
|
return std::make_shared<PartitionedHDFSSink>(
|
||||||
partition_by_ast,
|
partition_by_ast,
|
||||||
uri,
|
current_uri,
|
||||||
format_name,
|
format_name,
|
||||||
metadata_snapshot->getSampleBlock(),
|
metadata_snapshot->getSampleBlock(),
|
||||||
getContext(),
|
context_,
|
||||||
chooseCompressionMethod(uri, compression_method));
|
chooseCompressionMethod(current_uri, compression_method));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return std::make_shared<HDFSSink>(uri,
|
if (is_path_with_globs)
|
||||||
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
|
||||||
|
|
||||||
|
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_uri);
|
||||||
|
|
||||||
|
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef());
|
||||||
|
HDFSFSPtr fs = createHDFSFS(builder.get());
|
||||||
|
|
||||||
|
bool truncate_on_insert = context_->getSettingsRef().hdfs_truncate_on_insert;
|
||||||
|
if (!truncate_on_insert && !hdfsExists(fs.get(), path_from_uri.c_str()))
|
||||||
|
{
|
||||||
|
if (context_->getSettingsRef().hdfs_create_new_file_on_insert)
|
||||||
|
{
|
||||||
|
auto pos = uris[0].find_first_of('.', uris[0].find_last_of('/'));
|
||||||
|
size_t index = uris.size();
|
||||||
|
String new_uri;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
new_uri = uris[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : uris[0].substr(pos));
|
||||||
|
++index;
|
||||||
|
}
|
||||||
|
while (!hdfsExists(fs.get(), new_uri.c_str()));
|
||||||
|
uris.push_back(new_uri);
|
||||||
|
current_uri = new_uri;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::BAD_ARGUMENTS,
|
||||||
|
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
|
||||||
|
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
|
||||||
|
path_from_uri);
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<HDFSSink>(current_uri,
|
||||||
format_name,
|
format_name,
|
||||||
metadata_snapshot->getSampleBlock(),
|
metadata_snapshot->getSampleBlock(),
|
||||||
getContext(),
|
context_,
|
||||||
chooseCompressionMethod(uri, compression_method));
|
chooseCompressionMethod(current_uri, compression_method));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
|
void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
|
||||||
{
|
{
|
||||||
const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
|
const size_t begin_of_path = uris[0].find('/', uris[0].find("//") + 2);
|
||||||
const String path = uri.substr(begin_of_path);
|
const String url = uris[0].substr(0, begin_of_path);
|
||||||
const String url = uri.substr(0, begin_of_path);
|
|
||||||
|
|
||||||
HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef());
|
HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef());
|
||||||
HDFSFSPtr fs = createHDFSFS(builder.get());
|
HDFSFSPtr fs = createHDFSFS(builder.get());
|
||||||
|
|
||||||
int ret = hdfsDelete(fs.get(), path.data(), 0);
|
for (const auto & uri : uris)
|
||||||
if (ret)
|
{
|
||||||
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError()));
|
const String path = uri.substr(begin_of_path);
|
||||||
|
int ret = hdfsDelete(fs.get(), path.data(), 0);
|
||||||
|
if (ret)
|
||||||
|
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ public:
|
|||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams) override;
|
unsigned num_streams) override;
|
||||||
|
|
||||||
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override;
|
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
|
||||||
|
|
||||||
void truncate(
|
void truncate(
|
||||||
const ASTPtr & query,
|
const ASTPtr & query,
|
||||||
@ -70,11 +70,12 @@ protected:
|
|||||||
ASTPtr partition_by = nullptr);
|
ASTPtr partition_by = nullptr);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const String uri;
|
std::vector<const String> uris;
|
||||||
String format_name;
|
String format_name;
|
||||||
String compression_method;
|
String compression_method;
|
||||||
const bool distributed_processing;
|
const bool distributed_processing;
|
||||||
ASTPtr partition_by;
|
ASTPtr partition_by;
|
||||||
|
bool is_path_with_globs;
|
||||||
|
|
||||||
Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
|
Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
|
||||||
};
|
};
|
||||||
@ -95,6 +96,17 @@ public:
|
|||||||
std::shared_ptr<Impl> pimpl;
|
std::shared_ptr<Impl> pimpl;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class URISIterator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
URISIterator(const std::vector<const String> & uris_, ContextPtr context);
|
||||||
|
String next();
|
||||||
|
private:
|
||||||
|
class Impl;
|
||||||
|
/// shared_ptr to have copy constructor
|
||||||
|
std::shared_ptr<Impl> pimpl;
|
||||||
|
};
|
||||||
|
|
||||||
using IteratorWrapper = std::function<String()>;
|
using IteratorWrapper = std::function<String()>;
|
||||||
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;
|
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@ namespace ErrorCodes
|
|||||||
extern const int NETWORK_ERROR;
|
extern const int NETWORK_ERROR;
|
||||||
extern const int CANNOT_OPEN_FILE;
|
extern const int CANNOT_OPEN_FILE;
|
||||||
extern const int CANNOT_FSYNC;
|
extern const int CANNOT_FSYNC;
|
||||||
extern const int BAD_ARGUMENTS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -38,12 +37,6 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
|
|||||||
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
|
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
|
||||||
const String path = hdfs_uri.substr(begin_of_path);
|
const String path = hdfs_uri.substr(begin_of_path);
|
||||||
|
|
||||||
if (path.find_first_of("*?{") != std::string::npos)
|
|
||||||
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri);
|
|
||||||
|
|
||||||
if (!hdfsExists(fs.get(), path.c_str()))
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path);
|
|
||||||
|
|
||||||
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
|
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
|
||||||
|
|
||||||
if (fout == nullptr)
|
if (fout == nullptr)
|
||||||
|
@ -65,6 +65,7 @@ namespace ErrorCodes
|
|||||||
extern const int INCOMPATIBLE_COLUMNS;
|
extern const int INCOMPATIBLE_COLUMNS;
|
||||||
extern const int CANNOT_STAT;
|
extern const int CANNOT_STAT;
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int CANNOT_APPEND_TO_FILE;
|
||||||
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -285,6 +286,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
|
|||||||
{
|
{
|
||||||
is_db_table = false;
|
is_db_table = false;
|
||||||
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
|
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
|
||||||
|
is_path_with_globs = paths.size() > 1;
|
||||||
path_for_partitioned_write = table_path_;
|
path_for_partitioned_write = table_path_;
|
||||||
setStorageMetadata(args);
|
setStorageMetadata(args);
|
||||||
}
|
}
|
||||||
@ -603,7 +605,7 @@ public:
|
|||||||
int table_fd_,
|
int table_fd_,
|
||||||
bool use_table_fd_,
|
bool use_table_fd_,
|
||||||
std::string base_path_,
|
std::string base_path_,
|
||||||
std::vector<std::string> paths_,
|
std::string path_,
|
||||||
const CompressionMethod compression_method_,
|
const CompressionMethod compression_method_,
|
||||||
const std::optional<FormatSettings> & format_settings_,
|
const std::optional<FormatSettings> & format_settings_,
|
||||||
const String format_name_,
|
const String format_name_,
|
||||||
@ -615,7 +617,7 @@ public:
|
|||||||
, table_fd(table_fd_)
|
, table_fd(table_fd_)
|
||||||
, use_table_fd(use_table_fd_)
|
, use_table_fd(use_table_fd_)
|
||||||
, base_path(base_path_)
|
, base_path(base_path_)
|
||||||
, paths(paths_)
|
, path(path_)
|
||||||
, compression_method(compression_method_)
|
, compression_method(compression_method_)
|
||||||
, format_name(format_name_)
|
, format_name(format_name_)
|
||||||
, format_settings(format_settings_)
|
, format_settings(format_settings_)
|
||||||
@ -632,7 +634,7 @@ public:
|
|||||||
int table_fd_,
|
int table_fd_,
|
||||||
bool use_table_fd_,
|
bool use_table_fd_,
|
||||||
std::string base_path_,
|
std::string base_path_,
|
||||||
std::vector<std::string> paths_,
|
const std::string & path_,
|
||||||
const CompressionMethod compression_method_,
|
const CompressionMethod compression_method_,
|
||||||
const std::optional<FormatSettings> & format_settings_,
|
const std::optional<FormatSettings> & format_settings_,
|
||||||
const String format_name_,
|
const String format_name_,
|
||||||
@ -644,7 +646,7 @@ public:
|
|||||||
, table_fd(table_fd_)
|
, table_fd(table_fd_)
|
||||||
, use_table_fd(use_table_fd_)
|
, use_table_fd(use_table_fd_)
|
||||||
, base_path(base_path_)
|
, base_path(base_path_)
|
||||||
, paths(paths_)
|
, path(path_)
|
||||||
, compression_method(compression_method_)
|
, compression_method(compression_method_)
|
||||||
, format_name(format_name_)
|
, format_name(format_name_)
|
||||||
, format_settings(format_settings_)
|
, format_settings(format_settings_)
|
||||||
@ -666,10 +668,8 @@ public:
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (paths.size() != 1)
|
|
||||||
throw Exception("Table '" + table_name_for_log + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
|
|
||||||
flags |= O_WRONLY | O_APPEND | O_CREAT;
|
flags |= O_WRONLY | O_APPEND | O_CREAT;
|
||||||
naked_buffer = std::make_unique<WriteBufferFromFile>(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags);
|
naked_buffer = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// In case of formats with prefixes if file is not empty we have already written prefix.
|
/// In case of formats with prefixes if file is not empty we have already written prefix.
|
||||||
@ -709,7 +709,7 @@ private:
|
|||||||
int table_fd;
|
int table_fd;
|
||||||
bool use_table_fd;
|
bool use_table_fd;
|
||||||
std::string base_path;
|
std::string base_path;
|
||||||
std::vector<std::string> paths;
|
std::string path;
|
||||||
CompressionMethod compression_method;
|
CompressionMethod compression_method;
|
||||||
std::string format_name;
|
std::string format_name;
|
||||||
std::optional<FormatSettings> format_settings;
|
std::optional<FormatSettings> format_settings;
|
||||||
@ -752,7 +752,6 @@ public:
|
|||||||
{
|
{
|
||||||
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
|
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
|
||||||
PartitionedSink::validatePartitionKey(partition_path, true);
|
PartitionedSink::validatePartitionKey(partition_path, true);
|
||||||
Strings result_paths = {partition_path};
|
|
||||||
checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path);
|
checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path);
|
||||||
return std::make_shared<StorageFileSink>(
|
return std::make_shared<StorageFileSink>(
|
||||||
metadata_snapshot,
|
metadata_snapshot,
|
||||||
@ -760,7 +759,7 @@ public:
|
|||||||
-1,
|
-1,
|
||||||
/* use_table_fd */false,
|
/* use_table_fd */false,
|
||||||
base_path,
|
base_path,
|
||||||
result_paths,
|
partition_path,
|
||||||
compression_method,
|
compression_method,
|
||||||
format_settings,
|
format_settings,
|
||||||
format_name,
|
format_name,
|
||||||
@ -794,7 +793,6 @@ SinkToStoragePtr StorageFile::write(
|
|||||||
|
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
|
|
||||||
std::string path;
|
|
||||||
if (context->getSettingsRef().engine_file_truncate_on_insert)
|
if (context->getSettingsRef().engine_file_truncate_on_insert)
|
||||||
flags |= O_TRUNC;
|
flags |= O_TRUNC;
|
||||||
|
|
||||||
@ -815,7 +813,7 @@ SinkToStoragePtr StorageFile::write(
|
|||||||
std::unique_lock{rwlock, getLockTimeout(context)},
|
std::unique_lock{rwlock, getLockTimeout(context)},
|
||||||
base_path,
|
base_path,
|
||||||
path_for_partitioned_write,
|
path_for_partitioned_write,
|
||||||
chooseCompressionMethod(path, compression_method),
|
chooseCompressionMethod(path_for_partitioned_write, compression_method),
|
||||||
format_settings,
|
format_settings,
|
||||||
format_name,
|
format_name,
|
||||||
context,
|
context,
|
||||||
@ -823,10 +821,41 @@ SinkToStoragePtr StorageFile::write(
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
String path;
|
||||||
if (!paths.empty())
|
if (!paths.empty())
|
||||||
{
|
{
|
||||||
path = paths[0];
|
if (is_path_with_globs)
|
||||||
|
throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
|
||||||
|
|
||||||
|
path = paths.back();
|
||||||
fs::create_directories(fs::path(path).parent_path());
|
fs::create_directories(fs::path(path).parent_path());
|
||||||
|
|
||||||
|
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
|
||||||
|
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings) && fs::exists(paths.back())
|
||||||
|
&& fs::file_size(paths.back()) != 0)
|
||||||
|
{
|
||||||
|
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
|
||||||
|
{
|
||||||
|
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
|
||||||
|
size_t index = paths.size();
|
||||||
|
String new_path;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
|
||||||
|
++index;
|
||||||
|
}
|
||||||
|
while (fs::exists(new_path));
|
||||||
|
paths.push_back(new_path);
|
||||||
|
path = new_path;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::CANNOT_APPEND_TO_FILE,
|
||||||
|
"Cannot append data in format {} to file, because this format doesn't support appends."
|
||||||
|
" You can allow to create a new file "
|
||||||
|
"on each insert by enabling setting engine_file_allow_create_multiple_files",
|
||||||
|
format_name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_shared<StorageFileSink>(
|
return std::make_shared<StorageFileSink>(
|
||||||
@ -836,7 +865,7 @@ SinkToStoragePtr StorageFile::write(
|
|||||||
table_fd,
|
table_fd,
|
||||||
use_table_fd,
|
use_table_fd,
|
||||||
base_path,
|
base_path,
|
||||||
paths,
|
path,
|
||||||
chooseCompressionMethod(path, compression_method),
|
chooseCompressionMethod(path, compression_method),
|
||||||
format_settings,
|
format_settings,
|
||||||
format_name,
|
format_name,
|
||||||
@ -882,7 +911,7 @@ void StorageFile::truncate(
|
|||||||
ContextPtr /* context */,
|
ContextPtr /* context */,
|
||||||
TableExclusiveLockHolder &)
|
TableExclusiveLockHolder &)
|
||||||
{
|
{
|
||||||
if (paths.size() != 1)
|
if (is_path_with_globs)
|
||||||
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
|
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
|
||||||
|
|
||||||
if (use_table_fd)
|
if (use_table_fd)
|
||||||
@ -892,11 +921,14 @@ void StorageFile::truncate(
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (!fs::exists(paths[0]))
|
for (const auto & path : paths)
|
||||||
return;
|
{
|
||||||
|
if (!fs::exists(path))
|
||||||
|
continue;
|
||||||
|
|
||||||
if (0 != ::truncate(paths[0].c_str(), 0))
|
if (0 != ::truncate(path.c_str(), 0))
|
||||||
throwFromErrnoWithPath("Cannot truncate file " + paths[0], paths[0], ErrorCodes::CANNOT_TRUNCATE_FILE);
|
throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,6 +120,8 @@ private:
|
|||||||
size_t total_bytes_to_read = 0;
|
size_t total_bytes_to_read = 0;
|
||||||
|
|
||||||
String path_for_partitioned_write;
|
String path_for_partitioned_write;
|
||||||
|
|
||||||
|
bool is_path_with_globs = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ namespace ErrorCodes
|
|||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int S3_ERROR;
|
extern const int S3_ERROR;
|
||||||
extern const int UNEXPECTED_EXPRESSION;
|
extern const int UNEXPECTED_EXPRESSION;
|
||||||
extern const int CANNOT_OPEN_FILE;
|
extern const int DATABASE_ACCESS_DENIED;
|
||||||
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,8 +82,6 @@ public:
|
|||||||
Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
|
Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
|
||||||
: client(client_), globbed_uri(globbed_uri_)
|
: client(client_), globbed_uri(globbed_uri_)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
|
|
||||||
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
|
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
|
||||||
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
|
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
|
||||||
|
|
||||||
@ -176,6 +174,37 @@ String StorageS3Source::DisclosedGlobIterator::next()
|
|||||||
return pimpl->next();
|
return pimpl->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class StorageS3Source::KeysIterator::Impl
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit Impl(const std::vector<String> & keys_) : keys(keys_), keys_iter(keys.begin())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
String next()
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
if (keys_iter == keys.end())
|
||||||
|
return "";
|
||||||
|
auto key = *keys_iter;
|
||||||
|
++keys_iter;
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::mutex mutex;
|
||||||
|
Strings keys;
|
||||||
|
Strings::iterator keys_iter;
|
||||||
|
};
|
||||||
|
|
||||||
|
StorageS3Source::KeysIterator::KeysIterator(const std::vector<String> & keys_) : pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
String StorageS3Source::KeysIterator::next()
|
||||||
|
{
|
||||||
|
return pimpl->next();
|
||||||
|
}
|
||||||
|
|
||||||
Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column)
|
Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column)
|
||||||
{
|
{
|
||||||
@ -296,6 +325,39 @@ Chunk StorageS3Source::generate()
|
|||||||
return generate();
|
return generate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool checkIfObjectExists(const std::shared_ptr<Aws::S3::S3Client> & client, const String & bucket, const String & key)
|
||||||
|
{
|
||||||
|
bool is_finished = false;
|
||||||
|
Aws::S3::Model::ListObjectsV2Request request;
|
||||||
|
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||||
|
|
||||||
|
request.SetBucket(bucket);
|
||||||
|
request.SetPrefix(key);
|
||||||
|
while (!is_finished)
|
||||||
|
{
|
||||||
|
outcome = client->ListObjectsV2(request);
|
||||||
|
if (!outcome.IsSuccess())
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::S3_ERROR,
|
||||||
|
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||||
|
quoteString(bucket),
|
||||||
|
quoteString(key),
|
||||||
|
backQuote(outcome.GetError().GetExceptionName()),
|
||||||
|
quoteString(outcome.GetError().GetMessage()));
|
||||||
|
|
||||||
|
const auto & result_batch = outcome.GetResult().GetContents();
|
||||||
|
for (const auto & obj : result_batch)
|
||||||
|
{
|
||||||
|
if (obj.GetKey() == key)
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
|
||||||
|
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
class StorageS3Sink : public SinkToStorage
|
class StorageS3Sink : public SinkToStorage
|
||||||
{
|
{
|
||||||
@ -315,9 +377,6 @@ public:
|
|||||||
, sample_block(sample_block_)
|
, sample_block(sample_block_)
|
||||||
, format_settings(format_settings_)
|
, format_settings(format_settings_)
|
||||||
{
|
{
|
||||||
if (key.find_first_of("*?{") != std::string::npos)
|
|
||||||
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "S3 key '{}' contains globs, so the table is in readonly mode", key);
|
|
||||||
|
|
||||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||||
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
|
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
|
||||||
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
|
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
|
||||||
@ -419,7 +478,6 @@ private:
|
|||||||
std::optional<FormatSettings> format_settings;
|
std::optional<FormatSettings> format_settings;
|
||||||
|
|
||||||
ExpressionActionsPtr partition_by_expr;
|
ExpressionActionsPtr partition_by_expr;
|
||||||
String partition_by_column_name;
|
|
||||||
|
|
||||||
static void validateBucket(const String & str)
|
static void validateBucket(const String & str)
|
||||||
{
|
{
|
||||||
@ -468,6 +526,7 @@ StorageS3::StorageS3(
|
|||||||
ASTPtr partition_by_)
|
ASTPtr partition_by_)
|
||||||
: IStorage(table_id_)
|
: IStorage(table_id_)
|
||||||
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
|
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
|
||||||
|
, keys({uri_.key})
|
||||||
, format_name(format_name_)
|
, format_name(format_name_)
|
||||||
, max_single_read_retries(max_single_read_retries_)
|
, max_single_read_retries(max_single_read_retries_)
|
||||||
, min_upload_part_size(min_upload_part_size_)
|
, min_upload_part_size(min_upload_part_size_)
|
||||||
@ -477,6 +536,7 @@ StorageS3::StorageS3(
|
|||||||
, distributed_processing(distributed_processing_)
|
, distributed_processing(distributed_processing_)
|
||||||
, format_settings(format_settings_)
|
, format_settings(format_settings_)
|
||||||
, partition_by(partition_by_)
|
, partition_by(partition_by_)
|
||||||
|
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
|
||||||
{
|
{
|
||||||
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
|
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
|
||||||
StorageInMemoryMetadata storage_metadata;
|
StorageInMemoryMetadata storage_metadata;
|
||||||
@ -484,7 +544,7 @@ StorageS3::StorageS3(
|
|||||||
updateClientAndAuthSettings(context_, client_auth);
|
updateClientAndAuthSettings(context_, client_auth);
|
||||||
if (columns_.empty())
|
if (columns_.empty())
|
||||||
{
|
{
|
||||||
auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, format_settings, context_);
|
auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, is_key_with_globs, format_settings, context_);
|
||||||
storage_metadata.setColumns(columns);
|
storage_metadata.setColumns(columns);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -495,9 +555,8 @@ StorageS3::StorageS3(
|
|||||||
setInMemoryMetadata(storage_metadata);
|
setInMemoryMetadata(storage_metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context)
|
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context)
|
||||||
{
|
{
|
||||||
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper{nullptr};
|
|
||||||
if (distributed_processing)
|
if (distributed_processing)
|
||||||
{
|
{
|
||||||
return std::make_shared<StorageS3Source::IteratorWrapper>(
|
return std::make_shared<StorageS3Source::IteratorWrapper>(
|
||||||
@ -505,13 +564,23 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
|
|||||||
return callback();
|
return callback();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
else if (is_key_with_globs)
|
||||||
/// Iterate through disclosed globs and make a source for each file
|
|
||||||
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
|
|
||||||
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
|
|
||||||
{
|
{
|
||||||
return glob_iterator->next();
|
/// Iterate through disclosed globs and make a source for each file
|
||||||
});
|
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
|
||||||
|
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
|
||||||
|
{
|
||||||
|
return glob_iterator->next();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto keys_iterator = std::make_shared<StorageS3Source::KeysIterator>(keys);
|
||||||
|
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]()
|
||||||
|
{
|
||||||
|
return keys_iterator->next();
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Pipe StorageS3::read(
|
Pipe StorageS3::read(
|
||||||
@ -536,7 +605,7 @@ Pipe StorageS3::read(
|
|||||||
need_file_column = true;
|
need_file_column = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, distributed_processing, local_context);
|
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
|
||||||
|
|
||||||
for (size_t i = 0; i < num_streams; ++i)
|
for (size_t i = 0; i < num_streams; ++i)
|
||||||
{
|
{
|
||||||
@ -567,8 +636,8 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
|||||||
updateClientAndAuthSettings(local_context, client_auth);
|
updateClientAndAuthSettings(local_context, client_auth);
|
||||||
|
|
||||||
auto sample_block = metadata_snapshot->getSampleBlock();
|
auto sample_block = metadata_snapshot->getSampleBlock();
|
||||||
auto chosen_compression_method = chooseCompressionMethod(client_auth.uri.key, compression_method);
|
auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
|
||||||
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || client_auth.uri.key.find(PARTITION_ID_WILDCARD) != String::npos;
|
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
|
||||||
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
|
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
|
||||||
|
|
||||||
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
|
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
|
||||||
@ -585,12 +654,41 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
|||||||
chosen_compression_method,
|
chosen_compression_method,
|
||||||
client_auth.client,
|
client_auth.client,
|
||||||
client_auth.uri.bucket,
|
client_auth.uri.bucket,
|
||||||
client_auth.uri.key,
|
keys.back(),
|
||||||
min_upload_part_size,
|
min_upload_part_size,
|
||||||
max_single_part_upload_size);
|
max_single_part_upload_size);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
if (is_key_with_globs)
|
||||||
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
|
||||||
|
|
||||||
|
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
|
||||||
|
|
||||||
|
if (!truncate_in_insert && checkIfObjectExists(client_auth.client, client_auth.uri.bucket, keys.back()))
|
||||||
|
{
|
||||||
|
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
|
||||||
|
{
|
||||||
|
size_t index = keys.size();
|
||||||
|
auto pos = keys[0].find_first_of('.');
|
||||||
|
String new_key;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
|
||||||
|
++index;
|
||||||
|
}
|
||||||
|
while (checkIfObjectExists(client_auth.client, client_auth.uri.bucket, new_key));
|
||||||
|
keys.push_back(new_key);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::BAD_ARGUMENTS,
|
||||||
|
"Object in bucket {} with key {} already exists. If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
|
||||||
|
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
|
||||||
|
client_auth.uri.bucket,
|
||||||
|
keys.back());
|
||||||
|
}
|
||||||
|
|
||||||
return std::make_shared<StorageS3Sink>(
|
return std::make_shared<StorageS3Sink>(
|
||||||
format_name,
|
format_name,
|
||||||
sample_block,
|
sample_block,
|
||||||
@ -599,7 +697,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
|||||||
chosen_compression_method,
|
chosen_compression_method,
|
||||||
client_auth.client,
|
client_auth.client,
|
||||||
client_auth.uri.bucket,
|
client_auth.uri.bucket,
|
||||||
client_auth.uri.key,
|
keys.back(),
|
||||||
min_upload_part_size,
|
min_upload_part_size,
|
||||||
max_single_part_upload_size);
|
max_single_part_upload_size);
|
||||||
}
|
}
|
||||||
@ -610,11 +708,17 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
|
|||||||
{
|
{
|
||||||
updateClientAndAuthSettings(local_context, client_auth);
|
updateClientAndAuthSettings(local_context, client_auth);
|
||||||
|
|
||||||
Aws::S3::Model::ObjectIdentifier obj;
|
if (is_key_with_globs)
|
||||||
obj.SetKey(client_auth.uri.key);
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
|
||||||
|
|
||||||
Aws::S3::Model::Delete delkeys;
|
Aws::S3::Model::Delete delkeys;
|
||||||
delkeys.AddObjects(std::move(obj));
|
|
||||||
|
for (const auto & key : keys)
|
||||||
|
{
|
||||||
|
Aws::S3::Model::ObjectIdentifier obj;
|
||||||
|
obj.SetKey(key);
|
||||||
|
delkeys.AddObjects(std::move(obj));
|
||||||
|
}
|
||||||
|
|
||||||
Aws::S3::Model::DeleteObjectsRequest request;
|
Aws::S3::Model::DeleteObjectsRequest request;
|
||||||
request.SetBucket(client_auth.uri.bucket);
|
request.SetBucket(client_auth.uri.bucket);
|
||||||
@ -734,7 +838,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
|
|||||||
{
|
{
|
||||||
ClientAuthentication client_auth{uri, access_key_id, secret_access_key, max_connections, {}, {}};
|
ClientAuthentication client_auth{uri, access_key_id, secret_access_key, max_connections, {}, {}};
|
||||||
updateClientAndAuthSettings(ctx, client_auth);
|
updateClientAndAuthSettings(ctx, client_auth);
|
||||||
return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, format_settings, ctx);
|
return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||||
@ -743,12 +847,14 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
|||||||
UInt64 max_single_read_retries,
|
UInt64 max_single_read_retries,
|
||||||
const String & compression_method,
|
const String & compression_method,
|
||||||
bool distributed_processing,
|
bool distributed_processing,
|
||||||
|
bool is_key_with_globs,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
ContextPtr ctx)
|
ContextPtr ctx)
|
||||||
{
|
{
|
||||||
|
std::vector<String> keys = {client_auth.uri.key};
|
||||||
auto read_buffer_creator = [&]()
|
auto read_buffer_creator = [&]()
|
||||||
{
|
{
|
||||||
auto file_iterator = createFileIterator(client_auth, distributed_processing, ctx);
|
auto file_iterator = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, ctx);
|
||||||
String current_key = (*file_iterator)();
|
String current_key = (*file_iterator)();
|
||||||
if (current_key.empty())
|
if (current_key.empty())
|
||||||
throw Exception(
|
throw Exception(
|
||||||
|
@ -44,6 +44,18 @@ public:
|
|||||||
std::shared_ptr<Impl> pimpl;
|
std::shared_ptr<Impl> pimpl;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class KeysIterator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit KeysIterator(const std::vector<String> & keys_);
|
||||||
|
String next();
|
||||||
|
|
||||||
|
private:
|
||||||
|
class Impl;
|
||||||
|
/// shared_ptr to have copy constructor
|
||||||
|
std::shared_ptr<Impl> pimpl;
|
||||||
|
};
|
||||||
|
|
||||||
using IteratorWrapper = std::function<String()>;
|
using IteratorWrapper = std::function<String()>;
|
||||||
|
|
||||||
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
|
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
|
||||||
@ -174,6 +186,7 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
ClientAuthentication client_auth;
|
ClientAuthentication client_auth;
|
||||||
|
std::vector<String> keys;
|
||||||
|
|
||||||
String format_name;
|
String format_name;
|
||||||
UInt64 max_single_read_retries;
|
UInt64 max_single_read_retries;
|
||||||
@ -184,10 +197,11 @@ private:
|
|||||||
const bool distributed_processing;
|
const bool distributed_processing;
|
||||||
std::optional<FormatSettings> format_settings;
|
std::optional<FormatSettings> format_settings;
|
||||||
ASTPtr partition_by;
|
ASTPtr partition_by;
|
||||||
|
bool is_key_with_globs = false;
|
||||||
|
|
||||||
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
|
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
|
||||||
|
|
||||||
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context);
|
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context);
|
||||||
|
|
||||||
static ColumnsDescription getTableStructureFromDataImpl(
|
static ColumnsDescription getTableStructureFromDataImpl(
|
||||||
const String & format,
|
const String & format,
|
||||||
@ -195,6 +209,7 @@ private:
|
|||||||
UInt64 max_single_read_retries,
|
UInt64 max_single_read_retries,
|
||||||
const String & compression_method,
|
const String & compression_method,
|
||||||
bool distributed_processing,
|
bool distributed_processing,
|
||||||
|
bool is_key_with_globs,
|
||||||
const std::optional<FormatSettings> & format_settings,
|
const std::optional<FormatSettings> & format_settings,
|
||||||
ContextPtr ctx);
|
ContextPtr ctx);
|
||||||
};
|
};
|
||||||
|
@ -366,6 +366,43 @@ def test_hdfs_directory_not_exist(started_cluster):
|
|||||||
node1.query(ddl)
|
node1.query(ddl)
|
||||||
assert "" == node1.query("select * from HDFSStorageWithNotExistDir")
|
assert "" == node1.query("select * from HDFSStorageWithNotExistDir")
|
||||||
|
|
||||||
|
def test_overwrite(started_cluster):
|
||||||
|
hdfs_api = started_cluster.hdfs_api
|
||||||
|
|
||||||
|
table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')"
|
||||||
|
node1.query(f"create table test_overwrite as {table_function}")
|
||||||
|
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(5)")
|
||||||
|
node1.query_and_get_error(f"insert into test_overwrite select number, randomString(100) FROM numbers(10)")
|
||||||
|
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1")
|
||||||
|
|
||||||
|
result = node1.query(f"select count() from test_overwrite")
|
||||||
|
assert(int(result) == 10)
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_inserts(started_cluster):
|
||||||
|
hdfs_api = started_cluster.hdfs_api
|
||||||
|
|
||||||
|
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
|
||||||
|
node1.query(f"create table test_multiple_inserts as {table_function}")
|
||||||
|
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
|
||||||
|
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1")
|
||||||
|
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1")
|
||||||
|
|
||||||
|
result = node1.query(f"select count() from test_multiple_inserts")
|
||||||
|
assert(int(result) == 60)
|
||||||
|
|
||||||
|
result = node1.query(f"drop table test_multiple_inserts")
|
||||||
|
|
||||||
|
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
|
||||||
|
node1.query(f"create table test_multiple_inserts as {table_function}")
|
||||||
|
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)")
|
||||||
|
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1")
|
||||||
|
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1")
|
||||||
|
|
||||||
|
result = node1.query(f"select count() from test_multiple_inserts")
|
||||||
|
assert(int(result) == 60)
|
||||||
|
|
||||||
|
|
||||||
def test_format_detection(started_cluster):
|
def test_format_detection(started_cluster):
|
||||||
node1.query(f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')")
|
node1.query(f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')")
|
||||||
node1.query(f"insert into arrow_table select 1")
|
node1.query(f"insert into arrow_table select 1")
|
||||||
|
@ -10,6 +10,11 @@
|
|||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
<secret_access_key>minio123</secret_access_key>
|
<secret_access_key>minio123</secret_access_key>
|
||||||
</s3_parquet>
|
</s3_parquet>
|
||||||
|
<s3_parquet_gz>
|
||||||
|
<url>http://minio1:9001/root/test_parquet_gz</url>
|
||||||
|
<access_key_id>minio</access_key_id>
|
||||||
|
<secret_access_key>minio123</secret_access_key>
|
||||||
|
</s3_parquet_gz>
|
||||||
<s3_orc>
|
<s3_orc>
|
||||||
<url>http://minio1:9001/root/test_orc</url>
|
<url>http://minio1:9001/root/test_orc</url>
|
||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
|
@ -136,7 +136,7 @@ def test_put(started_cluster, maybe_auth, positive, compression):
|
|||||||
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
|
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
|
||||||
filename = "test.csv"
|
filename = "test.csv"
|
||||||
put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}',
|
put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}',
|
||||||
{maybe_auth}'CSV', '{table_format}', {compression}) values {values}"""
|
{maybe_auth}'CSV', '{table_format}', {compression}) values settings s3_truncate_on_insert=1 {values}"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
run_query(instance, put_query)
|
run_query(instance, put_query)
|
||||||
@ -298,7 +298,7 @@ def test_put_csv(started_cluster, maybe_auth, positive):
|
|||||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||||
filename = "test.csv"
|
filename = "test.csv"
|
||||||
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
|
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV settings s3_truncate_on_insert=1".format(
|
||||||
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format)
|
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format)
|
||||||
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
|
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
|
||||||
|
|
||||||
@ -322,7 +322,7 @@ def test_put_get_with_redirect(started_cluster):
|
|||||||
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
||||||
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
|
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
|
||||||
filename = "test.csv"
|
filename = "test.csv"
|
||||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
|
||||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
|
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
|
||||||
run_query(instance, query)
|
run_query(instance, query)
|
||||||
|
|
||||||
@ -350,12 +350,12 @@ def test_put_with_zero_redirect(started_cluster):
|
|||||||
filename = "test.csv"
|
filename = "test.csv"
|
||||||
|
|
||||||
# Should work without redirect
|
# Should work without redirect
|
||||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
|
||||||
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values)
|
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values)
|
||||||
run_query(instance, query)
|
run_query(instance, query)
|
||||||
|
|
||||||
# Should not work with redirect
|
# Should not work with redirect
|
||||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
|
||||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
|
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
|
||||||
exception_raised = False
|
exception_raised = False
|
||||||
try:
|
try:
|
||||||
@ -805,13 +805,13 @@ def test_seekable_formats(started_cluster):
|
|||||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||||
|
|
||||||
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
|
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
|
||||||
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
|
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
|
||||||
|
|
||||||
result = instance.query(f"SELECT count() FROM {table_function}")
|
result = instance.query(f"SELECT count() FROM {table_function}")
|
||||||
assert(int(result) == 5000000)
|
assert(int(result) == 5000000)
|
||||||
|
|
||||||
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
|
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
|
||||||
exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
|
exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
|
||||||
|
|
||||||
result = instance.query(f"SELECT count() FROM {table_function}")
|
result = instance.query(f"SELECT count() FROM {table_function}")
|
||||||
assert(int(result) == 5000000)
|
assert(int(result) == 5000000)
|
||||||
@ -827,14 +827,14 @@ def test_seekable_formats_url(started_cluster):
|
|||||||
instance = started_cluster.instances["dummy"]
|
instance = started_cluster.instances["dummy"]
|
||||||
|
|
||||||
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
|
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
|
||||||
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
|
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
|
||||||
|
|
||||||
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
|
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
|
||||||
result = instance.query(f"SELECT count() FROM {table_function}")
|
result = instance.query(f"SELECT count() FROM {table_function}")
|
||||||
assert(int(result) == 5000000)
|
assert(int(result) == 5000000)
|
||||||
|
|
||||||
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
|
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
|
||||||
exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
|
exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
|
||||||
|
|
||||||
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
|
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
|
||||||
result = instance.query(f"SELECT count() FROM {table_function}")
|
result = instance.query(f"SELECT count() FROM {table_function}")
|
||||||
@ -917,6 +917,48 @@ def test_empty_file(started_cluster):
|
|||||||
assert(int(result) == 0)
|
assert(int(result) == 0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_overwrite(started_cluster):
|
||||||
|
bucket = started_cluster.minio_bucket
|
||||||
|
instance = started_cluster.instances["dummy"]
|
||||||
|
|
||||||
|
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
|
||||||
|
instance.query(f"create table test_overwrite as {table_function}")
|
||||||
|
instance.query(f"truncate table test_overwrite")
|
||||||
|
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(50) settings s3_truncate_on_insert=1")
|
||||||
|
instance.query_and_get_error(f"insert into test_overwrite select number, randomString(100) from numbers(100)")
|
||||||
|
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(200) settings s3_truncate_on_insert=1")
|
||||||
|
|
||||||
|
result = instance.query(f"select count() from test_overwrite")
|
||||||
|
assert(int(result) == 200)
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_new_files_on_insert(started_cluster):
|
||||||
|
bucket = started_cluster.minio_bucket
|
||||||
|
instance = started_cluster.instances["dummy"]
|
||||||
|
|
||||||
|
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
|
||||||
|
instance.query(f"create table test_multiple_inserts as {table_function}")
|
||||||
|
instance.query(f"truncate table test_multiple_inserts")
|
||||||
|
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
|
||||||
|
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
|
||||||
|
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
|
||||||
|
|
||||||
|
result = instance.query(f"select count() from test_multiple_inserts")
|
||||||
|
assert(int(result) == 60)
|
||||||
|
|
||||||
|
instance.query(f"drop table test_multiple_inserts")
|
||||||
|
|
||||||
|
table_function = f"s3(s3_parquet_gz, structure='a Int32, b String', format='Parquet')"
|
||||||
|
instance.query(f"create table test_multiple_inserts as {table_function}")
|
||||||
|
instance.query(f"truncate table test_multiple_inserts")
|
||||||
|
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
|
||||||
|
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
|
||||||
|
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
|
||||||
|
|
||||||
|
result = instance.query(f"select count() from test_multiple_inserts")
|
||||||
|
assert(int(result) == 60)
|
||||||
|
|
||||||
|
|
||||||
def test_format_detection(started_cluster):
|
def test_format_detection(started_cluster):
|
||||||
bucket = started_cluster.minio_bucket
|
bucket = started_cluster.minio_bucket
|
||||||
instance = started_cluster.instances["dummy"]
|
instance = started_cluster.instances["dummy"]
|
||||||
|
@ -0,0 +1,100 @@
|
|||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
10
|
||||||
|
11
|
||||||
|
12
|
||||||
|
13
|
||||||
|
14
|
||||||
|
15
|
||||||
|
16
|
||||||
|
17
|
||||||
|
18
|
||||||
|
19
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
10
|
||||||
|
11
|
||||||
|
12
|
||||||
|
13
|
||||||
|
14
|
||||||
|
15
|
||||||
|
16
|
||||||
|
17
|
||||||
|
18
|
||||||
|
19
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
10
|
||||||
|
11
|
||||||
|
12
|
||||||
|
13
|
||||||
|
14
|
||||||
|
15
|
||||||
|
16
|
||||||
|
17
|
||||||
|
18
|
||||||
|
19
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
10
|
||||||
|
11
|
||||||
|
12
|
||||||
|
13
|
||||||
|
14
|
||||||
|
15
|
||||||
|
16
|
||||||
|
17
|
||||||
|
18
|
||||||
|
19
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
10
|
||||||
|
11
|
||||||
|
12
|
||||||
|
13
|
||||||
|
14
|
||||||
|
15
|
||||||
|
16
|
||||||
|
17
|
||||||
|
18
|
||||||
|
19
|
@ -0,0 +1,39 @@
|
|||||||
|
-- Tags: no-fasttest, no-parallel
|
||||||
|
|
||||||
|
drop table if exists test;
|
||||||
|
create table test (number UInt64) engine=File('Parquet');
|
||||||
|
insert into test select * from numbers(10);
|
||||||
|
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
|
||||||
|
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
|
||||||
|
select * from test order by number;
|
||||||
|
truncate table test;
|
||||||
|
drop table test;
|
||||||
|
|
||||||
|
create table test (number UInt64) engine=File('Parquet', 'test_02155/test1/data.Parquet');
|
||||||
|
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
|
||||||
|
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
|
||||||
|
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
|
||||||
|
select * from test order by number;
|
||||||
|
drop table test;
|
||||||
|
|
||||||
|
|
||||||
|
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
|
||||||
|
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
|
||||||
|
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
|
||||||
|
select * from file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64');
|
||||||
|
select * from file(concat(currentDatabase(), '/test2/data.1.Parquet'), 'Parquet', 'number UInt64');
|
||||||
|
|
||||||
|
create table test (number UInt64) engine=File('Parquet', 'test_02155/test3/data.Parquet.gz');
|
||||||
|
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
|
||||||
|
;
|
||||||
|
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
|
||||||
|
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
|
||||||
|
select * from test order by number;
|
||||||
|
drop table test;
|
||||||
|
|
||||||
|
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
|
||||||
|
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
|
||||||
|
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
|
||||||
|
select * from file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64');
|
||||||
|
select * from file(concat(currentDatabase(), '/test4/data.1.Parquet.gz'), 'Parquet', 'number UInt64');
|
||||||
|
|
@ -1,5 +1,5 @@
|
|||||||
-- Tags: no-fasttest
|
-- Tags: no-fasttest, no-parallel
|
||||||
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10);
|
insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10);
|
||||||
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10);
|
insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE }
|
||||||
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10);
|
insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE }
|
||||||
select 'OK';
|
select 'OK';
|
||||||
|
Loading…
Reference in New Issue
Block a user