mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Better
This commit is contained in:
parent
16cdd067e3
commit
9628874ac1
@ -1092,7 +1092,7 @@ void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const Bui
|
|||||||
|
|
||||||
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/)
|
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/)
|
||||||
{
|
{
|
||||||
String current_uri = uris.back();
|
String current_uri = uris.front();
|
||||||
|
|
||||||
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
|
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
|
||||||
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
|
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
|
||||||
@ -1114,7 +1114,7 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
|
|||||||
if (is_path_with_globs)
|
if (is_path_with_globs)
|
||||||
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
|
||||||
|
|
||||||
if (auto new_uri = checkFileExistsAndCreateNewKeyIfNeeded(context_, current_uri, uris.size()))
|
if (auto new_uri = checkFileExistsAndCreateNewKeyIfNeeded(context_, uris.front(), uris.size()))
|
||||||
{
|
{
|
||||||
uris.push_back(*new_uri);
|
uris.push_back(*new_uri);
|
||||||
current_uri = *new_uri;
|
current_uri = *new_uri;
|
||||||
|
@ -868,8 +868,9 @@ void ReadFromAzureBlob::initializePipeline(QueryPipelineBuilder & pipeline, cons
|
|||||||
|
|
||||||
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
|
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
|
||||||
{
|
{
|
||||||
|
auto path = configuration.blobs_paths.front();
|
||||||
auto sample_block = metadata_snapshot->getSampleBlock();
|
auto sample_block = metadata_snapshot->getSampleBlock();
|
||||||
auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method);
|
auto chosen_compression_method = chooseCompressionMethod(path, configuration.compression_method);
|
||||||
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
|
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
|
||||||
|
|
||||||
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
|
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
|
||||||
@ -885,7 +886,7 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
|
|||||||
format_settings,
|
format_settings,
|
||||||
chosen_compression_method,
|
chosen_compression_method,
|
||||||
object_storage.get(),
|
object_storage.get(),
|
||||||
configuration.blobs_paths.back());
|
path);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -893,8 +894,11 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
|
|||||||
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
|
||||||
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
|
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
|
||||||
|
|
||||||
if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(local_context, object_storage.get(), configuration.blobs_paths.back(), configuration.blobs_paths.size()))
|
if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(local_context, object_storage.get(), path, configuration.blobs_paths.size()))
|
||||||
|
{
|
||||||
configuration.blobs_paths.push_back(*new_path);
|
configuration.blobs_paths.push_back(*new_path);
|
||||||
|
path = *new_path;
|
||||||
|
}
|
||||||
|
|
||||||
return std::make_shared<StorageAzureBlobSink>(
|
return std::make_shared<StorageAzureBlobSink>(
|
||||||
configuration.format,
|
configuration.format,
|
||||||
@ -903,7 +907,7 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta
|
|||||||
format_settings,
|
format_settings,
|
||||||
chosen_compression_method,
|
chosen_compression_method,
|
||||||
object_storage.get(),
|
object_storage.get(),
|
||||||
configuration.blobs_paths.back());
|
path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1968,22 +1968,22 @@ SinkToStoragePtr StorageFile::write(
|
|||||||
"Table '{}' is in readonly mode because of globs in filepath",
|
"Table '{}' is in readonly mode because of globs in filepath",
|
||||||
getStorageID().getNameForLogs());
|
getStorageID().getNameForLogs());
|
||||||
|
|
||||||
path = paths.back();
|
path = paths.front();
|
||||||
fs::create_directories(fs::path(path).parent_path());
|
fs::create_directories(fs::path(path).parent_path());
|
||||||
|
|
||||||
std::error_code error_code;
|
std::error_code error_code;
|
||||||
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
|
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
|
||||||
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings)
|
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings)
|
||||||
&& fs::file_size(paths.back(), error_code) != 0 && !error_code)
|
&& fs::file_size(path, error_code) != 0 && !error_code)
|
||||||
{
|
{
|
||||||
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
|
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
|
||||||
{
|
{
|
||||||
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
|
auto pos = path.find_first_of('.', path.find_last_of('/'));
|
||||||
size_t index = paths.size();
|
size_t index = paths.size();
|
||||||
String new_path;
|
String new_path;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
|
new_path = path.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : path.substr(pos));
|
||||||
++index;
|
++index;
|
||||||
}
|
}
|
||||||
while (fs::exists(new_path));
|
while (fs::exists(new_path));
|
||||||
|
@ -1281,6 +1281,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
|
|||||||
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
|
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
|
||||||
{
|
{
|
||||||
auto query_configuration = updateConfigurationAndGetCopy(local_context);
|
auto query_configuration = updateConfigurationAndGetCopy(local_context);
|
||||||
|
auto key = query_configuration.keys.front();
|
||||||
|
|
||||||
auto sample_block = metadata_snapshot->getSampleBlock();
|
auto sample_block = metadata_snapshot->getSampleBlock();
|
||||||
auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method);
|
auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method);
|
||||||
@ -1300,7 +1301,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
|||||||
chosen_compression_method,
|
chosen_compression_method,
|
||||||
query_configuration,
|
query_configuration,
|
||||||
query_configuration.url.bucket,
|
query_configuration.url.bucket,
|
||||||
query_configuration.keys.back());
|
key);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1308,10 +1309,11 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
|||||||
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
|
||||||
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
|
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
|
||||||
|
|
||||||
if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(local_context, configuration, query_configuration.keys.back(), query_configuration.keys.size()))
|
if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(local_context, configuration, query_configuration.keys.front(), query_configuration.keys.size()))
|
||||||
{
|
{
|
||||||
query_configuration.keys.push_back(*new_key);
|
query_configuration.keys.push_back(*new_key);
|
||||||
configuration.keys.push_back(*new_key);
|
configuration.keys.push_back(*new_key);
|
||||||
|
key = *new_key;
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_shared<StorageS3Sink>(
|
return std::make_shared<StorageS3Sink>(
|
||||||
@ -1322,7 +1324,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
|||||||
chosen_compression_method,
|
chosen_compression_method,
|
||||||
query_configuration,
|
query_configuration,
|
||||||
query_configuration.url.bucket,
|
query_configuration.url.bucket,
|
||||||
query_configuration.keys.back());
|
key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user