Merge pull request #34555 from kitaisreal/table-functions-insert-partition-by-refactoring

TableFunctionFile added performance test
This commit is contained in:
alexey-milovidov 2022-02-13 09:08:51 +03:00 committed by GitHub
commit 1774836d4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 2 deletions

View File

@ -199,18 +199,27 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user
fs_table_path = user_files_absolute_path / fs_table_path; fs_table_path = user_files_absolute_path / fs_table_path;
Strings paths; Strings paths;
/// Do not use fs::canonical or fs::weakly_canonical. /// Do not use fs::canonical or fs::weakly_canonical.
/// Otherwise it will not allow to work with symlinks in `user_files_path` directory. /// Otherwise it will not allow to work with symlinks in `user_files_path` directory.
String path = fs::absolute(fs_table_path).lexically_normal(); /// Normalize path. String path = fs::absolute(fs_table_path).lexically_normal(); /// Normalize path.
if (path.find_first_of("*?{") == std::string::npos)
if (path.find(PartitionedSink::PARTITION_ID_WILDCARD) != std::string::npos)
{
paths.push_back(path);
}
else if (path.find_first_of("*?{") == std::string::npos)
{ {
std::error_code error; std::error_code error;
if (fs::exists(path)) if (fs::exists(path))
total_bytes_to_read += fs::file_size(path, error); total_bytes_to_read += fs::file_size(path, error);
paths.push_back(path); paths.push_back(path);
} }
else else
{
paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read); paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read);
}
for (const auto & cur_path : paths) for (const auto & cur_path : paths)
checkCreationIsAllowed(context, user_files_absolute_path, cur_path); checkCreationIsAllowed(context, user_files_absolute_path, cur_path);
@ -313,7 +322,11 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
is_db_table = false; is_db_table = false;
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read); paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
is_path_with_globs = paths.size() > 1; is_path_with_globs = paths.size() > 1;
path_for_partitioned_write = table_path_; if (!paths.empty())
path_for_partitioned_write = paths.front();
else
path_for_partitioned_write = table_path_;
setStorageMetadata(args); setStorageMetadata(args);
} }
@ -853,6 +866,7 @@ SinkToStoragePtr StorageFile::write(
{ {
if (path_for_partitioned_write.empty()) if (path_for_partitioned_write.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty path for partitioned write"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty path for partitioned write");
fs::create_directories(fs::path(path_for_partitioned_write).parent_path()); fs::create_directories(fs::path(path_for_partitioned_write).parent_path());
return std::make_shared<PartitionedStorageFileSink>( return std::make_shared<PartitionedStorageFileSink>(

View File

@ -0,0 +1,54 @@
<test>
<substitutions>
<substitution>
<name>format</name>
<values>
<value>TabSeparated</value>
<value>TabSeparatedWithNames</value>
<value>TabSeparatedWithNamesAndTypes</value>
<value>CSV</value>
<value>CSVWithNames</value>
<value>Values</value>
<value>JSONEachRow</value>
<value>JSONCompactEachRow</value>
<value>JSONCompactEachRowWithNamesAndTypes</value>
<value>TSKV</value>
<value>RowBinary</value>
<value>Native</value>
<value>MsgPack</value>
</values>
</substitution>
<substitution>
<name>partitions_count</name>
<values>
<value>5</value>
<value>50</value>
<value>500</value>
</values>
</substitution>
</substitutions>
<query>
INSERT INTO FUNCTION file('test_file', '{format}', 'key UInt64, value UInt64')
SELECT number, number FROM numbers(1000000)
</query>
<query>
INSERT INTO FUNCTION file('test_file', '{format}', 'key UInt64, value1 UInt64, value2 UInt64, value3 UInt64, value4 UInt64, value5 UInt64')
SELECT number, number, number, number, number, number FROM numbers(1000000)
</query>
<query>
INSERT INTO FUNCTION file('test_file_{{_partition_id}}', '{format}', 'partition_id UInt64, value UInt64')
PARTITION BY partition_id
SELECT (number % {partitions_count}) as partition_id, number FROM numbers(1000000)
</query>
<query>
INSERT INTO FUNCTION file('test_file_{{_partition_id}}', '{format}', 'partition_id UInt64, value1 UInt64, value2 UInt64, value3 UInt64, value4 UInt64, value5 UInt64')
PARTITION BY partition_id
SELECT (number % {partitions_count}) as partition_id, number, number, number, number, number FROM numbers(1000000)
</query>
</test>