2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/StorageFile.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
#include <Storages/StorageFactory.h>
|
2022-06-23 20:04:06 +00:00
|
|
|
#include <Storages/ColumnsDescription.h>
|
|
|
|
#include <Storages/StorageInMemoryMetadata.h>
|
2023-05-29 20:08:18 +00:00
|
|
|
#include <Storages/PartitionedSink.h>
|
2023-01-21 16:01:41 +00:00
|
|
|
#include <Storages/Distributed/DistributedAsyncInsertSource.h>
|
2022-06-23 20:04:06 +00:00
|
|
|
#include <Storages/checkAndGetLiteralArgument.h>
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
#include <Interpreters/evaluateConstantExpression.h>
|
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
#include <Parsers/ASTCreateQuery.h>
|
2021-11-26 15:49:40 +00:00
|
|
|
#include <Parsers/ASTIdentifier_fwd.h>
|
2021-10-25 16:23:44 +00:00
|
|
|
#include <Parsers/ASTInsertQuery.h>
|
2021-11-26 15:49:40 +00:00
|
|
|
#include <Parsers/ASTLiteral.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2022-12-04 22:27:28 +00:00
|
|
|
#include <IO/MMapReadBufferFromFile.h>
|
|
|
|
#include <IO/MMapReadBufferFromFileDescriptor.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/ReadBufferFromFile.h>
|
2021-07-18 12:55:24 +00:00
|
|
|
#include <IO/ReadBufferFromFileDescriptor.h>
|
2019-11-20 14:48:01 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/WriteBufferFromFile.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
2023-07-28 11:55:23 +00:00
|
|
|
#include <IO/Archives/createArchiveReader.h>
|
|
|
|
#include <IO/Archives/IArchiveReader.h>
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2023-06-16 19:38:50 +00:00
|
|
|
#include <DataTypes/DataTypeLowCardinality.h>
|
2020-01-15 07:52:45 +00:00
|
|
|
#include <DataTypes/DataTypeString.h>
|
2021-12-15 11:30:57 +00:00
|
|
|
#include <Formats/FormatFactory.h>
|
|
|
|
#include <Formats/ReadSchemaUtils.h>
|
2021-07-23 14:25:35 +00:00
|
|
|
#include <Processors/Sinks/SinkToStorage.h>
|
2021-07-21 16:13:17 +00:00
|
|
|
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
2022-06-23 20:04:06 +00:00
|
|
|
#include <Processors/ISource.h>
|
2023-05-29 20:08:18 +00:00
|
|
|
#include <Processors/Formats/IOutputFormat.h>
|
2022-06-23 20:04:06 +00:00
|
|
|
#include <Processors/Formats/IInputFormat.h>
|
|
|
|
#include <Processors/Formats/ISchemaReader.h>
|
|
|
|
#include <Processors/Sources/NullSource.h>
|
|
|
|
#include <Processors/Executors/PullingPipelineExecutor.h>
|
2023-04-06 22:17:09 +00:00
|
|
|
#include <Processors/ResizeProcessor.h>
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/escapeForFileName.h>
|
2023-05-29 20:08:18 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2019-07-21 13:15:04 +00:00
|
|
|
#include <Common/parseGlobs.h>
|
2021-10-17 08:42:36 +00:00
|
|
|
#include <Common/filesystemHelpers.h>
|
2023-05-07 12:18:52 +00:00
|
|
|
#include <Common/logger_useful.h>
|
2022-12-11 21:15:41 +00:00
|
|
|
#include <Common/ProfileEvents.h>
|
2022-06-23 20:04:06 +00:00
|
|
|
|
|
|
|
#include <QueryPipeline/Pipe.h>
|
|
|
|
#include <QueryPipeline/QueryPipelineBuilder.h>
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2021-07-06 10:23:39 +00:00
|
|
|
#include <sys/stat.h>
|
2016-10-25 13:49:07 +00:00
|
|
|
#include <fcntl.h>
|
2020-01-05 02:57:09 +00:00
|
|
|
#include <unistd.h>
|
2019-07-21 13:15:04 +00:00
|
|
|
#include <re2/re2.h>
|
2019-08-27 15:20:31 +00:00
|
|
|
#include <filesystem>
|
2023-01-12 15:51:04 +00:00
|
|
|
#include <shared_mutex>
|
2023-04-25 11:27:20 +00:00
|
|
|
#include <cmath>
|
2023-05-11 12:44:36 +00:00
|
|
|
#include <algorithm>
|
2019-07-21 13:15:04 +00:00
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
|
2022-12-11 21:15:41 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
|
|
|
extern const Event CreatedReadBufferOrdinary;
|
|
|
|
extern const Event CreatedReadBufferMMap;
|
|
|
|
extern const Event CreatedReadBufferMMapFailed;
|
|
|
|
}
|
|
|
|
|
2019-08-27 15:20:31 +00:00
|
|
|
namespace fs = std::filesystem;
|
2019-07-21 13:15:04 +00:00
|
|
|
|
2016-10-18 14:18:37 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-10-28 17:38:32 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-02-25 18:02:41 +00:00
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
extern const int NOT_IMPLEMENTED;
|
2021-08-22 17:21:49 +00:00
|
|
|
extern const int CANNOT_FSTAT;
|
2020-01-05 02:57:09 +00:00
|
|
|
extern const int CANNOT_TRUNCATE_FILE;
|
2016-11-11 17:01:02 +00:00
|
|
|
extern const int DATABASE_ACCESS_DENIED;
|
2017-12-30 00:36:06 +00:00
|
|
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
|
|
|
extern const int UNKNOWN_IDENTIFIER;
|
2017-11-03 19:53:10 +00:00
|
|
|
extern const int INCORRECT_FILE_NAME;
|
2018-04-10 08:54:31 +00:00
|
|
|
extern const int FILE_DOESNT_EXIST;
|
2023-05-07 12:18:52 +00:00
|
|
|
extern const int FILE_ALREADY_EXISTS;
|
2020-09-24 23:29:16 +00:00
|
|
|
extern const int TIMEOUT_EXCEEDED;
|
2020-10-14 12:19:29 +00:00
|
|
|
extern const int INCOMPATIBLE_COLUMNS;
|
2021-07-09 09:20:11 +00:00
|
|
|
extern const int CANNOT_STAT;
|
2021-10-28 13:56:45 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2021-12-29 18:03:15 +00:00
|
|
|
extern const int CANNOT_APPEND_TO_FILE;
|
2021-12-15 11:30:57 +00:00
|
|
|
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
2023-01-17 22:46:04 +00:00
|
|
|
extern const int CANNOT_COMPILE_REGEXP;
|
2018-08-10 04:02:56 +00:00
|
|
|
}
|
2016-10-28 17:38:32 +00:00
|
|
|
|
2019-08-02 15:00:12 +00:00
|
|
|
namespace
|
|
|
|
{
|
2019-09-22 22:13:42 +00:00
|
|
|
|
2023-06-05 01:21:43 +00:00
|
|
|
/// Forward-declare to use in listFilesWithFoldedRegexpMatchingImpl()
|
|
|
|
void listFilesWithRegexpMatchingImpl(
|
|
|
|
const std::string & path_for_ls,
|
|
|
|
const std::string & for_match,
|
|
|
|
size_t & total_bytes_to_read,
|
|
|
|
std::vector<std::string> & result,
|
|
|
|
bool recursive = false);
|
|
|
|
|
2023-06-15 11:09:38 +00:00
|
|
|
/*
|
|
|
|
* When `{...}` has any `/`s, it must be processed in a different way:
|
2023-06-19 21:06:08 +00:00
|
|
|
* Basically, a path with globs is processed by listFilesWithRegexpMatchingImpl. In case it detects multi-dir glob {.../..., .../...},
|
|
|
|
* listFilesWithFoldedRegexpMatchingImpl is in charge from now on.
|
2023-06-15 11:09:38 +00:00
|
|
|
* It works a bit different: it still recursively goes through subdirectories, but does not match every directory to glob.
|
|
|
|
* Instead, it goes many levels down (until the approximate max_depth is reached) and compares this multi-dir path to a glob.
|
|
|
|
* StorageHDFS.cpp has the same logic.
|
|
|
|
*/
|
2023-06-10 23:50:17 +00:00
|
|
|
void listFilesWithFoldedRegexpMatchingImpl(const std::string & path_for_ls,
|
|
|
|
const std::string & processed_suffix,
|
2023-06-05 01:21:43 +00:00
|
|
|
const std::string & suffix_with_globs,
|
2023-06-10 23:50:17 +00:00
|
|
|
re2::RE2 & matcher,
|
|
|
|
size_t & total_bytes_to_read,
|
|
|
|
const size_t max_depth,
|
|
|
|
const size_t next_slash_after_glob_pos,
|
2023-06-05 01:21:43 +00:00
|
|
|
std::vector<std::string> & result)
|
|
|
|
{
|
|
|
|
if (!max_depth)
|
|
|
|
return;
|
|
|
|
|
|
|
|
const fs::directory_iterator end;
|
2023-06-10 23:50:17 +00:00
|
|
|
for (fs::directory_iterator it(path_for_ls); it != end; ++it)
|
2023-06-05 01:21:43 +00:00
|
|
|
{
|
|
|
|
const std::string full_path = it->path().string();
|
|
|
|
const size_t last_slash = full_path.rfind('/');
|
|
|
|
const String dir_or_file_name = full_path.substr(last_slash);
|
|
|
|
|
|
|
|
if (re2::RE2::FullMatch(processed_suffix + dir_or_file_name, matcher))
|
|
|
|
{
|
|
|
|
if (next_slash_after_glob_pos == std::string::npos)
|
|
|
|
{
|
|
|
|
total_bytes_to_read += it->file_size();
|
|
|
|
result.push_back(it->path().string());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2023-06-12 09:41:36 +00:00
|
|
|
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "" ,
|
2023-06-05 01:21:43 +00:00
|
|
|
suffix_with_globs.substr(next_slash_after_glob_pos),
|
|
|
|
total_bytes_to_read, result);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if (it->is_directory())
|
|
|
|
{
|
2023-06-12 09:41:36 +00:00
|
|
|
listFilesWithFoldedRegexpMatchingImpl(fs::path(full_path), processed_suffix + dir_or_file_name,
|
2023-06-19 21:06:08 +00:00
|
|
|
suffix_with_globs, matcher, total_bytes_to_read,
|
|
|
|
max_depth - 1, next_slash_after_glob_pos, result);
|
2023-06-05 01:21:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-10 16:00:01 +00:00
|
|
|
/* Recursive directory listing with matched paths as a result.
|
|
|
|
* Have the same method in StorageHDFS.
|
|
|
|
*/
|
2022-04-25 21:44:43 +00:00
|
|
|
void listFilesWithRegexpMatchingImpl(
|
|
|
|
const std::string & path_for_ls,
|
|
|
|
const std::string & for_match,
|
|
|
|
size_t & total_bytes_to_read,
|
2022-10-17 07:04:25 +00:00
|
|
|
std::vector<std::string> & result,
|
2023-06-05 01:21:43 +00:00
|
|
|
bool recursive)
|
2019-08-02 15:00:12 +00:00
|
|
|
{
|
2023-06-05 01:21:43 +00:00
|
|
|
const size_t first_glob_pos = for_match.find_first_of("*?{");
|
|
|
|
const bool has_glob = first_glob_pos != std::string::npos;
|
2019-08-05 23:10:19 +00:00
|
|
|
|
2023-06-05 01:21:43 +00:00
|
|
|
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
|
2019-08-30 15:19:05 +00:00
|
|
|
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
|
2019-08-08 14:26:02 +00:00
|
|
|
|
2023-06-05 01:21:43 +00:00
|
|
|
/// slashes_in_glob counter is a upper-bound estimate of recursion depth
|
|
|
|
/// needed to process complex cases when `/` is included into glob, e.g. /pa{th1/a,th2/b}.csv
|
|
|
|
size_t slashes_in_glob = 0;
|
2023-06-13 16:47:02 +00:00
|
|
|
const size_t next_slash_after_glob_pos = [&]()
|
2023-05-29 20:08:18 +00:00
|
|
|
{
|
2023-06-05 01:21:43 +00:00
|
|
|
if (!has_glob)
|
|
|
|
return suffix_with_globs.find('/', 1);
|
|
|
|
|
|
|
|
size_t in_curly = 0;
|
2023-06-13 16:47:02 +00:00
|
|
|
for (std::string::const_iterator it = ++suffix_with_globs.begin(); it != suffix_with_globs.end(); it++)
|
|
|
|
{
|
2023-06-05 01:21:43 +00:00
|
|
|
if (*it == '{')
|
|
|
|
++in_curly;
|
|
|
|
else if (*it == '/')
|
|
|
|
{
|
|
|
|
if (in_curly)
|
|
|
|
++slashes_in_glob;
|
|
|
|
else
|
|
|
|
return size_t(std::distance(suffix_with_globs.begin(), it));
|
|
|
|
}
|
|
|
|
else if (*it == '}')
|
|
|
|
--in_curly;
|
|
|
|
}
|
|
|
|
return std::string::npos;
|
|
|
|
}();
|
2019-08-08 14:26:02 +00:00
|
|
|
|
2023-06-05 01:21:43 +00:00
|
|
|
const std::string current_glob = suffix_with_globs.substr(0, next_slash_after_glob_pos);
|
2022-10-17 07:04:25 +00:00
|
|
|
|
|
|
|
auto regexp = makeRegexpPatternFromGlobs(current_glob);
|
2019-08-05 23:10:19 +00:00
|
|
|
|
2020-01-05 20:11:26 +00:00
|
|
|
re2::RE2 matcher(regexp);
|
2023-01-17 22:46:04 +00:00
|
|
|
if (!matcher.ok())
|
|
|
|
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
|
|
|
|
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
|
2022-10-17 07:04:25 +00:00
|
|
|
|
|
|
|
bool skip_regex = current_glob == "/*" ? true : false;
|
|
|
|
if (!recursive)
|
|
|
|
recursive = current_glob == "/**" ;
|
|
|
|
|
2019-08-30 15:19:05 +00:00
|
|
|
const std::string prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs);
|
2021-04-27 00:05:43 +00:00
|
|
|
|
|
|
|
if (!fs::exists(prefix_without_globs))
|
2022-04-25 21:44:43 +00:00
|
|
|
return;
|
|
|
|
|
2023-06-05 01:21:43 +00:00
|
|
|
const bool looking_for_directory = next_slash_after_glob_pos != std::string::npos;
|
2023-05-29 20:08:18 +00:00
|
|
|
|
2023-06-05 01:21:43 +00:00
|
|
|
if (slashes_in_glob)
|
|
|
|
{
|
2023-06-10 23:50:17 +00:00
|
|
|
listFilesWithFoldedRegexpMatchingImpl(fs::path(prefix_without_globs), "", suffix_with_globs,
|
2023-06-19 21:06:08 +00:00
|
|
|
matcher, total_bytes_to_read, slashes_in_glob,
|
2023-06-05 01:21:43 +00:00
|
|
|
next_slash_after_glob_pos, result);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2019-08-30 15:19:05 +00:00
|
|
|
const fs::directory_iterator end;
|
|
|
|
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
|
2019-08-02 15:00:12 +00:00
|
|
|
{
|
2019-08-30 15:19:05 +00:00
|
|
|
const std::string full_path = it->path().string();
|
|
|
|
const size_t last_slash = full_path.rfind('/');
|
|
|
|
const String file_name = full_path.substr(last_slash);
|
2022-04-25 21:44:43 +00:00
|
|
|
|
2019-08-10 16:00:01 +00:00
|
|
|
/// Condition is_directory means what kind of path is it in current iteration of ls
|
2022-04-25 21:44:43 +00:00
|
|
|
if (!it->is_directory() && !looking_for_directory)
|
2019-08-02 15:00:12 +00:00
|
|
|
{
|
2022-10-17 07:04:25 +00:00
|
|
|
if (skip_regex || re2::RE2::FullMatch(file_name, matcher))
|
|
|
|
{
|
2022-04-25 21:44:43 +00:00
|
|
|
total_bytes_to_read += it->file_size();
|
2019-08-02 15:00:12 +00:00
|
|
|
result.push_back(it->path().string());
|
2022-10-17 07:04:25 +00:00
|
|
|
}
|
2019-08-02 15:00:12 +00:00
|
|
|
}
|
2022-10-17 07:04:25 +00:00
|
|
|
else if (it->is_directory())
|
2019-08-02 15:00:12 +00:00
|
|
|
{
|
2022-10-17 07:04:25 +00:00
|
|
|
if (recursive)
|
2019-08-02 15:00:12 +00:00
|
|
|
{
|
2023-07-25 04:10:04 +00:00
|
|
|
listFilesWithRegexpMatchingImpl(fs::path(full_path).append(it->path().string()) / "",
|
|
|
|
looking_for_directory ? suffix_with_globs.substr(next_slash_after_glob_pos) : current_glob,
|
2022-10-17 07:04:25 +00:00
|
|
|
total_bytes_to_read, result, recursive);
|
2019-08-02 15:00:12 +00:00
|
|
|
}
|
2022-10-17 07:04:25 +00:00
|
|
|
else if (looking_for_directory && re2::RE2::FullMatch(file_name, matcher))
|
2019-09-23 14:50:33 +00:00
|
|
|
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
|
2023-06-05 01:21:43 +00:00
|
|
|
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash_after_glob_pos), total_bytes_to_read, result);
|
2019-08-02 15:00:12 +00:00
|
|
|
}
|
|
|
|
}
|
2022-04-25 21:44:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<std::string> listFilesWithRegexpMatching(
|
|
|
|
const std::string & path_for_ls,
|
|
|
|
const std::string & for_match,
|
|
|
|
size_t & total_bytes_to_read)
|
|
|
|
{
|
|
|
|
std::vector<std::string> result;
|
|
|
|
listFilesWithRegexpMatchingImpl(path_for_ls, for_match, total_bytes_to_read, result);
|
2019-08-02 15:00:12 +00:00
|
|
|
return result;
|
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2020-03-09 01:03:43 +00:00
|
|
|
std::string getTablePath(const std::string & table_dir_path, const std::string & format_name)
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2019-10-25 19:07:47 +00:00
|
|
|
return table_dir_path + "/data." + escapeForFileName(format_name);
|
2016-11-11 17:01:02 +00:00
|
|
|
}
|
|
|
|
|
2018-04-19 04:39:16 +00:00
|
|
|
/// Both db_dir_path and table_path must be converted to absolute paths (in particular, path cannot contain '..').
|
2022-04-25 21:44:43 +00:00
|
|
|
void checkCreationIsAllowed(
|
|
|
|
ContextPtr context_global,
|
|
|
|
const std::string & db_dir_path,
|
|
|
|
const std::string & table_path,
|
|
|
|
bool can_be_directory)
|
2016-11-11 17:01:02 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context_global->getApplicationType() != Context::ApplicationType::SERVER)
|
2018-04-06 09:53:29 +00:00
|
|
|
return;
|
|
|
|
|
2019-12-29 07:03:39 +00:00
|
|
|
/// "/dev/null" is allowed for perf testing
|
2021-10-17 08:42:36 +00:00
|
|
|
if (!fileOrSymlinkPathStartsWith(table_path, db_dir_path) && table_path != "/dev/null")
|
|
|
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "File `{}` is not inside `{}`", table_path, db_dir_path);
|
2018-04-10 07:09:50 +00:00
|
|
|
|
2022-04-25 21:44:43 +00:00
|
|
|
if (can_be_directory)
|
|
|
|
{
|
|
|
|
auto table_path_stat = fs::status(table_path);
|
|
|
|
if (fs::exists(table_path_stat) && fs::is_directory(table_path_stat))
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "File must not be a directory");
|
2022-04-25 21:44:43 +00:00
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
2021-10-28 13:56:45 +00:00
|
|
|
|
2022-12-12 05:39:08 +00:00
|
|
|
std::unique_ptr<ReadBuffer> selectReadBuffer(
|
2021-12-15 11:30:57 +00:00
|
|
|
const String & current_path,
|
|
|
|
bool use_table_fd,
|
|
|
|
int table_fd,
|
2022-12-12 05:39:08 +00:00
|
|
|
const struct stat & file_stat,
|
2021-12-15 11:30:57 +00:00
|
|
|
ContextPtr context)
|
|
|
|
{
|
2022-12-15 23:08:19 +00:00
|
|
|
auto read_method = context->getSettingsRef().storage_file_read_method;
|
2022-12-11 21:15:41 +00:00
|
|
|
|
2023-06-24 19:14:28 +00:00
|
|
|
/** Using mmap on server-side is unsafe for the following reasons:
|
2023-05-10 01:16:52 +00:00
|
|
|
* - concurrent modifications of a file will result in SIGBUS;
|
|
|
|
* - IO error from the device will result in SIGBUS;
|
|
|
|
* - recovery from this signal is not feasible even with the usage of siglongjmp,
|
|
|
|
* as it might require stack unwinding from arbitrary place;
|
|
|
|
* - arbitrary slowdown due to page fault in arbitrary place in the code is difficult to debug.
|
|
|
|
*
|
|
|
|
* But we keep this mode for clickhouse-local as it is not so bad for a command line tool.
|
|
|
|
*/
|
2023-06-24 19:14:28 +00:00
|
|
|
if (context->getApplicationType() == Context::ApplicationType::SERVER && read_method == LocalFSReadMethod::mmap)
|
|
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Using storage_file_read_method=mmap is not safe in server mode. Consider using pread.");
|
2023-05-10 01:16:52 +00:00
|
|
|
|
2023-06-24 19:14:28 +00:00
|
|
|
if (S_ISREG(file_stat.st_mode) && read_method == LocalFSReadMethod::mmap)
|
2022-12-11 21:15:41 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2022-12-12 05:39:08 +00:00
|
|
|
std::unique_ptr<ReadBufferFromFileBase> res;
|
2022-12-11 21:15:41 +00:00
|
|
|
if (use_table_fd)
|
2022-12-12 05:39:08 +00:00
|
|
|
res = std::make_unique<MMapReadBufferFromFileDescriptor>(table_fd, 0);
|
2022-12-11 21:15:41 +00:00
|
|
|
else
|
2022-12-12 05:39:08 +00:00
|
|
|
res = std::make_unique<MMapReadBufferFromFile>(current_path, 0);
|
2022-12-11 21:15:41 +00:00
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMap);
|
2022-12-12 05:39:08 +00:00
|
|
|
return res;
|
2022-12-11 21:15:41 +00:00
|
|
|
}
|
|
|
|
catch (const ErrnoException &)
|
|
|
|
{
|
|
|
|
/// Fallback if mmap is not supported.
|
|
|
|
ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMapFailed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-12 05:39:08 +00:00
|
|
|
std::unique_ptr<ReadBufferFromFileBase> res;
|
|
|
|
if (S_ISREG(file_stat.st_mode) && (read_method == LocalFSReadMethod::pread || read_method == LocalFSReadMethod::mmap))
|
2022-12-11 21:15:41 +00:00
|
|
|
{
|
2023-05-29 20:08:18 +00:00
|
|
|
if (use_table_fd)
|
2022-12-12 05:39:08 +00:00
|
|
|
res = std::make_unique<ReadBufferFromFileDescriptorPRead>(table_fd);
|
2022-12-11 21:15:41 +00:00
|
|
|
else
|
2022-12-12 05:39:08 +00:00
|
|
|
res = std::make_unique<ReadBufferFromFilePRead>(current_path, context->getSettingsRef().max_read_buffer_size);
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2022-12-11 21:15:41 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
|
|
|
|
}
|
2022-12-12 05:39:08 +00:00
|
|
|
else
|
2022-12-11 21:15:41 +00:00
|
|
|
{
|
|
|
|
if (use_table_fd)
|
2022-12-12 05:39:08 +00:00
|
|
|
res = std::make_unique<ReadBufferFromFileDescriptor>(table_fd);
|
2023-05-29 20:08:18 +00:00
|
|
|
else
|
2022-12-12 05:39:08 +00:00
|
|
|
res = std::make_unique<ReadBufferFromFile>(current_path, context->getSettingsRef().max_read_buffer_size);
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2022-12-11 21:15:41 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
|
|
|
|
}
|
2022-12-12 05:39:08 +00:00
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2023-05-30 19:32:24 +00:00
|
|
|
struct stat getFileStat(const String & current_path, bool use_table_fd, int table_fd, const String & storage_name)
|
2021-12-15 11:30:57 +00:00
|
|
|
{
|
|
|
|
struct stat file_stat{};
|
|
|
|
if (use_table_fd)
|
|
|
|
{
|
|
|
|
/// Check if file descriptor allows random reads (and reading it twice).
|
|
|
|
if (0 != fstat(table_fd, &file_stat))
|
|
|
|
throwFromErrno("Cannot stat table file descriptor, inside " + storage_name, ErrorCodes::CANNOT_STAT);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Check if file descriptor allows random reads (and reading it twice).
|
|
|
|
if (0 != stat(current_path.c_str(), &file_stat))
|
|
|
|
throwFromErrno("Cannot stat file " + current_path, ErrorCodes::CANNOT_STAT);
|
2023-05-30 19:32:24 +00:00
|
|
|
}
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2023-05-30 19:32:24 +00:00
|
|
|
return file_stat;
|
|
|
|
}
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2023-05-30 19:32:24 +00:00
|
|
|
std::unique_ptr<ReadBuffer> createReadBuffer(
|
|
|
|
const String & current_path,
|
|
|
|
const struct stat & file_stat,
|
|
|
|
bool use_table_fd,
|
|
|
|
int table_fd,
|
|
|
|
const String & compression_method,
|
2023-07-28 08:49:00 +00:00
|
|
|
ContextPtr context,
|
2023-07-31 12:04:27 +00:00
|
|
|
const String & path_to_archive = "")
|
2023-05-30 19:32:24 +00:00
|
|
|
{
|
|
|
|
CompressionMethod method;
|
2022-12-11 21:15:41 +00:00
|
|
|
|
2023-07-31 12:04:27 +00:00
|
|
|
if (!path_to_archive.empty())
|
2023-07-28 08:49:00 +00:00
|
|
|
{
|
|
|
|
auto reader = createArchiveReader(path_to_archive);
|
|
|
|
std::unique_ptr<ReadBuffer> in = reader->readFile(current_path);
|
|
|
|
return in;
|
2021-12-15 11:30:57 +00:00
|
|
|
}
|
|
|
|
|
2023-05-30 19:32:24 +00:00
|
|
|
if (use_table_fd)
|
|
|
|
method = chooseCompressionMethod("", compression_method);
|
|
|
|
else
|
2021-12-15 11:30:57 +00:00
|
|
|
method = chooseCompressionMethod(current_path, compression_method);
|
|
|
|
|
2022-12-12 05:39:08 +00:00
|
|
|
std::unique_ptr<ReadBuffer> nested_buffer = selectReadBuffer(current_path, use_table_fd, table_fd, file_stat, context);
|
2022-12-11 21:15:41 +00:00
|
|
|
|
2022-10-07 10:46:45 +00:00
|
|
|
int zstd_window_log_max = static_cast<int>(context->getSettingsRef().zstd_window_log_max);
|
2022-06-18 12:55:35 +00:00
|
|
|
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method, zstd_window_log_max);
|
2021-12-15 11:30:57 +00:00
|
|
|
}
|
|
|
|
|
2019-09-06 18:29:41 +00:00
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2021-04-26 13:34:44 +00:00
|
|
|
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read)
|
2020-10-14 12:19:29 +00:00
|
|
|
{
|
2021-05-24 16:03:09 +00:00
|
|
|
fs::path user_files_absolute_path = fs::weakly_canonical(user_files_path);
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::path fs_table_path(table_path);
|
|
|
|
if (fs_table_path.is_relative())
|
|
|
|
fs_table_path = user_files_absolute_path / fs_table_path;
|
2020-10-14 12:19:29 +00:00
|
|
|
|
|
|
|
Strings paths;
|
2022-02-12 16:05:35 +00:00
|
|
|
|
2021-10-17 08:42:36 +00:00
|
|
|
/// Do not use fs::canonical or fs::weakly_canonical.
|
|
|
|
/// Otherwise it will not allow to work with symlinks in `user_files_path` directory.
|
2021-12-13 04:57:54 +00:00
|
|
|
String path = fs::absolute(fs_table_path).lexically_normal(); /// Normalize path.
|
2022-04-25 21:44:43 +00:00
|
|
|
bool can_be_directory = true;
|
2022-02-12 16:05:35 +00:00
|
|
|
|
|
|
|
if (path.find(PartitionedSink::PARTITION_ID_WILDCARD) != std::string::npos)
|
|
|
|
{
|
|
|
|
paths.push_back(path);
|
|
|
|
}
|
|
|
|
else if (path.find_first_of("*?{") == std::string::npos)
|
2021-04-26 13:34:44 +00:00
|
|
|
{
|
2021-04-27 09:54:12 +00:00
|
|
|
std::error_code error;
|
2022-04-25 21:44:43 +00:00
|
|
|
size_t size = fs::file_size(path, error);
|
|
|
|
if (!error)
|
|
|
|
total_bytes_to_read += size;
|
2022-02-12 16:05:35 +00:00
|
|
|
|
2020-10-14 12:19:29 +00:00
|
|
|
paths.push_back(path);
|
2021-04-26 13:34:44 +00:00
|
|
|
}
|
2020-10-14 12:19:29 +00:00
|
|
|
else
|
2022-02-12 16:05:35 +00:00
|
|
|
{
|
2022-04-25 21:44:43 +00:00
|
|
|
/// We list only non-directory files.
|
2021-04-26 13:34:44 +00:00
|
|
|
paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read);
|
2022-04-25 21:44:43 +00:00
|
|
|
can_be_directory = false;
|
2022-02-12 16:05:35 +00:00
|
|
|
}
|
2020-10-14 12:19:29 +00:00
|
|
|
|
|
|
|
for (const auto & cur_path : paths)
|
2022-04-25 21:44:43 +00:00
|
|
|
checkCreationIsAllowed(context, user_files_absolute_path, cur_path, can_be_directory);
|
2020-10-14 12:19:29 +00:00
|
|
|
|
|
|
|
return paths;
|
|
|
|
}
|
|
|
|
|
2022-01-24 18:41:44 +00:00
|
|
|
ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr context)
|
|
|
|
{
|
|
|
|
/// If we want to read schema from file descriptor we should create
|
|
|
|
/// a read buffer from fd, create a checkpoint, read some data required
|
|
|
|
/// for schema inference, rollback to checkpoint and then use the created
|
|
|
|
/// peekable read buffer on the first read from storage. It's needed because
|
|
|
|
/// in case of file descriptor we have a stream of data and we cannot
|
|
|
|
/// start reading data from the beginning after reading some data for
|
|
|
|
/// schema inference.
|
2022-06-21 13:02:48 +00:00
|
|
|
ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
|
2022-01-24 18:41:44 +00:00
|
|
|
{
|
|
|
|
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place
|
|
|
|
/// where we can store the original read buffer.
|
2023-05-30 19:32:24 +00:00
|
|
|
auto file_stat = getFileStat("", true, table_fd, getName());
|
|
|
|
read_buffer_from_fd = createReadBuffer("", file_stat, true, table_fd, compression_method, context);
|
2022-01-24 18:41:44 +00:00
|
|
|
auto read_buf = std::make_unique<PeekableReadBuffer>(*read_buffer_from_fd);
|
|
|
|
read_buf->setCheckpoint();
|
|
|
|
return read_buf;
|
|
|
|
};
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2022-04-19 19:16:47 +00:00
|
|
|
auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, false, context, peekable_read_buffer_from_fd);
|
2022-01-24 18:41:44 +00:00
|
|
|
if (peekable_read_buffer_from_fd)
|
2022-01-31 08:58:40 +00:00
|
|
|
{
|
2022-01-24 18:41:44 +00:00
|
|
|
/// If we have created read buffer in readSchemaFromFormat we should rollback to checkpoint.
|
|
|
|
assert_cast<PeekableReadBuffer *>(peekable_read_buffer_from_fd.get())->rollbackToCheckpoint();
|
2022-01-31 08:58:40 +00:00
|
|
|
has_peekable_read_buffer_from_fd = true;
|
|
|
|
}
|
2022-01-24 18:41:44 +00:00
|
|
|
return columns;
|
|
|
|
}
|
|
|
|
|
|
|
|
ColumnsDescription StorageFile::getTableStructureFromFile(
|
2021-12-15 11:30:57 +00:00
|
|
|
const String & format,
|
|
|
|
const std::vector<String> & paths,
|
|
|
|
const String & compression_method,
|
|
|
|
const std::optional<FormatSettings> & format_settings,
|
2023-05-25 00:00:32 +00:00
|
|
|
ContextPtr context,
|
|
|
|
const std::vector<String> & paths_to_archive)
|
2021-12-15 11:30:57 +00:00
|
|
|
{
|
|
|
|
if (format == "Distributed")
|
|
|
|
{
|
|
|
|
if (paths.empty())
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Cannot get table structure from file, because no files match specified name");
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2023-01-21 16:01:41 +00:00
|
|
|
return ColumnsDescription(DistributedAsyncInsertSource(paths[0]).getOutputs().front().getHeader().getNamesAndTypesList());
|
2021-12-15 11:30:57 +00:00
|
|
|
}
|
|
|
|
|
2022-04-13 16:59:04 +00:00
|
|
|
if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format))
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
|
2023-01-23 21:13:58 +00:00
|
|
|
"Cannot extract table structure from {} format file, because there are no files with provided path. "
|
|
|
|
"You must specify table structure manually", format);
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2022-06-21 13:02:48 +00:00
|
|
|
std::optional<ColumnsDescription> columns_from_cache;
|
2022-08-15 12:33:08 +00:00
|
|
|
if (context->getSettingsRef().schema_inference_use_cache_for_file)
|
2022-06-27 12:43:24 +00:00
|
|
|
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
|
2022-06-21 13:02:48 +00:00
|
|
|
|
2023-05-25 00:00:32 +00:00
|
|
|
ReadBufferIterator read_buffer_iterator;
|
2023-05-29 20:08:18 +00:00
|
|
|
if (paths_to_archive.empty())
|
|
|
|
{
|
2023-07-28 08:49:00 +00:00
|
|
|
read_buffer_iterator = [&, it = paths.begin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
|
2023-05-25 00:00:32 +00:00
|
|
|
{
|
2023-07-28 08:49:00 +00:00
|
|
|
String path;
|
|
|
|
struct stat file_stat;
|
|
|
|
do
|
2023-06-13 14:43:50 +00:00
|
|
|
{
|
2023-07-28 08:49:00 +00:00
|
|
|
if (it == paths.end())
|
|
|
|
{
|
|
|
|
if (first)
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
|
|
|
|
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
|
|
|
|
format);
|
|
|
|
return nullptr;
|
|
|
|
}
|
2022-02-09 16:14:14 +00:00
|
|
|
|
2023-07-28 08:49:00 +00:00
|
|
|
path = *it++;
|
|
|
|
file_stat = getFileStat(path, false, -1, "File");
|
2023-06-13 14:43:50 +00:00
|
|
|
}
|
2023-07-28 08:49:00 +00:00
|
|
|
while (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0);
|
2022-02-09 16:14:14 +00:00
|
|
|
|
2023-07-28 08:49:00 +00:00
|
|
|
first = false;
|
|
|
|
return createReadBuffer(path, file_stat, false, -1, compression_method, context);
|
2023-05-25 00:00:32 +00:00
|
|
|
};
|
2023-05-29 20:08:18 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2023-07-31 08:50:09 +00:00
|
|
|
read_buffer_iterator = [&, path_it = paths.begin(), archive_it = paths_to_archive.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
|
2023-05-25 00:00:32 +00:00
|
|
|
{
|
2023-07-31 08:50:09 +00:00
|
|
|
if (archive_it == paths_to_archive.end())
|
2023-05-25 00:00:32 +00:00
|
|
|
return nullptr;
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2023-07-31 08:50:09 +00:00
|
|
|
auto file_stat = getFileStat(*archive_it, false, -1, "File");
|
2023-07-28 08:49:00 +00:00
|
|
|
|
2023-07-31 08:50:09 +00:00
|
|
|
return createReadBuffer(*path_it, file_stat, false, -1, compression_method, context, *archive_it);
|
2023-05-25 00:00:32 +00:00
|
|
|
};
|
|
|
|
}
|
2021-12-15 11:30:57 +00:00
|
|
|
|
2022-06-21 13:02:48 +00:00
|
|
|
ColumnsDescription columns;
|
|
|
|
if (columns_from_cache)
|
|
|
|
columns = *columns_from_cache;
|
|
|
|
else
|
|
|
|
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
|
|
|
|
|
2022-08-15 12:33:08 +00:00
|
|
|
if (context->getSettingsRef().schema_inference_use_cache_for_file)
|
2022-06-27 12:43:24 +00:00
|
|
|
addColumnsToCache(paths, columns, format, format_settings, context);
|
2022-06-21 13:02:48 +00:00
|
|
|
|
|
|
|
return columns;
|
2021-12-15 11:30:57 +00:00
|
|
|
}
|
|
|
|
|
2022-05-13 18:39:19 +00:00
|
|
|
bool StorageFile::supportsSubsetOfColumns() const
|
2021-03-31 14:21:19 +00:00
|
|
|
{
|
2022-05-13 18:39:19 +00:00
|
|
|
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
|
2021-03-31 14:21:19 +00:00
|
|
|
}
|
|
|
|
|
2023-04-29 02:29:51 +00:00
|
|
|
bool StorageFile::prefersLargeBlocks() const
|
|
|
|
{
|
|
|
|
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name);
|
|
|
|
}
|
|
|
|
|
2023-05-05 04:18:46 +00:00
|
|
|
bool StorageFile::parallelizeOutputAfterReading(ContextPtr context) const
|
|
|
|
{
|
|
|
|
return FormatFactory::instance().checkParallelizeOutputAfterReading(format_name, context);
|
|
|
|
}
|
|
|
|
|
2019-10-30 14:17:55 +00:00
|
|
|
StorageFile::StorageFile(int table_fd_, CommonArguments args)
|
|
|
|
: StorageFile(args)
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2021-08-22 17:21:49 +00:00
|
|
|
struct stat buf;
|
|
|
|
int res = fstat(table_fd_, &buf);
|
|
|
|
if (-1 == res)
|
|
|
|
throwFromErrno("Cannot execute fstat", res, ErrorCodes::CANNOT_FSTAT);
|
|
|
|
total_bytes_to_read = buf.st_size;
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (args.getContext()->getApplicationType() == Context::ApplicationType::SERVER)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Using file descriptor as source of storage isn't allowed for server daemons");
|
2020-10-14 12:19:29 +00:00
|
|
|
if (args.format_name == "Distributed")
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Distributed format is allowed only with explicit file path");
|
2019-08-24 21:20:20 +00:00
|
|
|
|
2019-10-30 14:17:55 +00:00
|
|
|
is_db_table = false;
|
|
|
|
use_table_fd = true;
|
|
|
|
table_fd = table_fd_;
|
2021-12-15 11:30:57 +00:00
|
|
|
setStorageMetadata(args);
|
2019-10-30 14:17:55 +00:00
|
|
|
}
|
2017-11-03 19:53:10 +00:00
|
|
|
|
2019-12-11 20:05:53 +00:00
|
|
|
StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args)
|
2019-10-30 14:17:55 +00:00
|
|
|
: StorageFile(args)
|
|
|
|
{
|
2023-07-31 12:04:27 +00:00
|
|
|
if (!args.path_to_archive.empty())
|
2023-05-29 20:08:18 +00:00
|
|
|
{
|
2023-05-25 00:00:32 +00:00
|
|
|
paths_to_archive = getPathsList(args.path_to_archive, user_files_path, args.getContext(), total_bytes_to_read);
|
|
|
|
paths = {table_path_};
|
2023-05-29 20:08:18 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2023-05-25 00:00:32 +00:00
|
|
|
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
|
|
|
|
}
|
2023-07-31 12:04:27 +00:00
|
|
|
|
2019-10-30 14:17:55 +00:00
|
|
|
is_db_table = false;
|
2021-12-29 18:03:15 +00:00
|
|
|
is_path_with_globs = paths.size() > 1;
|
2022-02-12 16:05:35 +00:00
|
|
|
if (!paths.empty())
|
|
|
|
path_for_partitioned_write = paths.front();
|
|
|
|
else
|
|
|
|
path_for_partitioned_write = table_path_;
|
|
|
|
|
2023-05-07 12:18:52 +00:00
|
|
|
file_renamer = FileRenamer(args.rename_after_processing);
|
|
|
|
|
2021-12-15 11:30:57 +00:00
|
|
|
setStorageMetadata(args);
|
2019-10-30 14:17:55 +00:00
|
|
|
}
|
2018-04-06 09:53:29 +00:00
|
|
|
|
2019-10-30 14:17:55 +00:00
|
|
|
StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArguments args)
|
|
|
|
: StorageFile(args)
|
|
|
|
{
|
|
|
|
if (relative_table_dir_path.empty())
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Storage {} requires data path", getName());
|
2020-10-14 12:19:29 +00:00
|
|
|
if (args.format_name == "Distributed")
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Distributed format is allowed only with explicit file path");
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-05-08 10:59:55 +00:00
|
|
|
String table_dir_path = fs::path(base_path) / relative_table_dir_path / "";
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::create_directories(table_dir_path);
|
2019-10-30 14:17:55 +00:00
|
|
|
paths = {getTablePath(table_dir_path, format_name)};
|
2022-04-25 21:44:43 +00:00
|
|
|
|
|
|
|
std::error_code error;
|
|
|
|
size_t size = fs::file_size(paths[0], error);
|
|
|
|
if (!error)
|
|
|
|
total_bytes_to_read = size;
|
2021-12-15 11:30:57 +00:00
|
|
|
|
|
|
|
setStorageMetadata(args);
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
|
|
|
|
2019-10-30 14:17:55 +00:00
|
|
|
StorageFile::StorageFile(CommonArguments args)
|
2020-04-27 13:55:30 +00:00
|
|
|
: IStorage(args.table_id)
|
2019-12-04 16:06:55 +00:00
|
|
|
, format_name(args.format_name)
|
2020-11-02 07:50:38 +00:00
|
|
|
, format_settings(args.format_settings)
|
2019-12-04 16:06:55 +00:00
|
|
|
, compression_method(args.compression_method)
|
2021-04-10 23:33:54 +00:00
|
|
|
, base_path(args.getContext()->getPath())
|
2021-12-15 11:30:57 +00:00
|
|
|
{
|
2022-05-23 12:48:48 +00:00
|
|
|
if (format_name != "Distributed")
|
|
|
|
FormatFactory::instance().checkFormatName(format_name);
|
2021-12-15 11:30:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void StorageFile::setStorageMetadata(CommonArguments args)
|
2019-10-30 14:17:55 +00:00
|
|
|
{
|
2020-06-19 15:39:41 +00:00
|
|
|
StorageInMemoryMetadata storage_metadata;
|
2021-12-15 11:30:57 +00:00
|
|
|
|
|
|
|
if (args.format_name == "Distributed" || args.columns.empty())
|
|
|
|
{
|
2022-01-24 18:41:44 +00:00
|
|
|
ColumnsDescription columns;
|
|
|
|
if (use_table_fd)
|
|
|
|
columns = getTableStructureFromFileDescriptor(args.getContext());
|
|
|
|
else
|
|
|
|
{
|
2023-07-28 08:49:00 +00:00
|
|
|
columns = getTableStructureFromFile(format_name, paths, compression_method, format_settings, args.getContext(), paths_to_archive);
|
2022-01-24 18:41:44 +00:00
|
|
|
if (!args.columns.empty() && args.columns != columns)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Table structure and file structure are different");
|
2022-01-24 18:41:44 +00:00
|
|
|
}
|
2021-12-15 11:30:57 +00:00
|
|
|
storage_metadata.setColumns(columns);
|
|
|
|
}
|
|
|
|
else
|
2020-06-19 15:39:41 +00:00
|
|
|
storage_metadata.setColumns(args.columns);
|
2020-01-04 18:37:31 +00:00
|
|
|
|
2020-06-19 15:39:41 +00:00
|
|
|
storage_metadata.setConstraints(args.constraints);
|
2021-04-23 12:18:23 +00:00
|
|
|
storage_metadata.setComment(args.comment);
|
2020-06-19 15:39:41 +00:00
|
|
|
setInMemoryMetadata(storage_metadata);
|
2019-10-30 14:17:55 +00:00
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2020-09-24 23:29:16 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
static std::chrono::seconds getLockTimeout(ContextPtr context)
|
2020-09-24 23:29:16 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
const Settings & settings = context->getSettingsRef();
|
2020-09-24 23:29:16 +00:00
|
|
|
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
|
|
|
|
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
|
|
|
|
lock_timeout = settings.max_execution_time.totalSeconds();
|
|
|
|
return std::chrono::seconds{lock_timeout};
|
|
|
|
}
|
|
|
|
|
2021-03-30 17:57:21 +00:00
|
|
|
using StorageFilePtr = std::shared_ptr<StorageFile>;
|
|
|
|
|
2020-09-24 23:29:16 +00:00
|
|
|
|
2022-05-20 19:49:31 +00:00
|
|
|
class StorageFileSource : public ISource
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
|
|
|
public:
|
2020-01-31 13:12:11 +00:00
|
|
|
struct FilesInfo
|
|
|
|
{
|
|
|
|
std::vector<std::string> files;
|
2023-05-25 00:00:32 +00:00
|
|
|
std::vector<std::string> paths_to_archive;
|
2020-01-31 13:12:11 +00:00
|
|
|
|
|
|
|
std::atomic<size_t> next_file_to_read = 0;
|
2023-05-25 00:00:32 +00:00
|
|
|
std::atomic<size_t> next_archive_to_read = 0;
|
2020-01-31 13:12:11 +00:00
|
|
|
|
|
|
|
bool need_path_column = false;
|
|
|
|
bool need_file_column = false;
|
2022-05-06 15:04:03 +00:00
|
|
|
|
|
|
|
size_t total_bytes_to_read = 0;
|
2020-01-31 13:12:11 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
using FilesInfoPtr = std::shared_ptr<FilesInfo>;
|
|
|
|
|
2022-08-06 17:29:33 +00:00
|
|
|
static Block getBlockForSource(const Block & block_for_format, const FilesInfoPtr & files_info)
|
2021-03-01 14:11:25 +00:00
|
|
|
{
|
2022-08-06 17:29:33 +00:00
|
|
|
auto res = block_for_format;
|
|
|
|
if (files_info->need_path_column)
|
|
|
|
{
|
|
|
|
res.insert(
|
2022-02-04 14:13:06 +00:00
|
|
|
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
|
|
|
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
|
|
|
"_path"});
|
2022-08-06 17:29:33 +00:00
|
|
|
}
|
|
|
|
if (files_info->need_file_column)
|
|
|
|
{
|
|
|
|
res.insert(
|
2022-02-04 14:13:06 +00:00
|
|
|
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
|
|
|
|
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
|
|
|
|
"_file"});
|
2022-08-06 17:29:33 +00:00
|
|
|
}
|
|
|
|
return res;
|
2021-03-30 17:57:21 +00:00
|
|
|
}
|
|
|
|
|
2020-01-31 13:12:11 +00:00
|
|
|
StorageFileSource(
|
|
|
|
std::shared_ptr<StorageFile> storage_,
|
2021-07-09 03:15:41 +00:00
|
|
|
const StorageSnapshotPtr & storage_snapshot_,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context_,
|
2020-01-31 13:12:11 +00:00
|
|
|
UInt64 max_block_size_,
|
|
|
|
FilesInfoPtr files_info_,
|
2022-01-24 18:41:44 +00:00
|
|
|
ColumnsDescription columns_description_,
|
2022-08-06 17:29:33 +00:00
|
|
|
const Block & block_for_format_,
|
2022-01-24 18:41:44 +00:00
|
|
|
std::unique_ptr<ReadBuffer> read_buf_)
|
2023-06-22 17:24:43 +00:00
|
|
|
: ISource(getBlockForSource(block_for_format_, files_info_), false)
|
2020-01-31 13:12:11 +00:00
|
|
|
, storage(std::move(storage_))
|
2021-07-09 03:15:41 +00:00
|
|
|
, storage_snapshot(storage_snapshot_)
|
2020-01-31 13:12:11 +00:00
|
|
|
, files_info(std::move(files_info_))
|
2022-01-24 18:41:44 +00:00
|
|
|
, read_buf(std::move(read_buf_))
|
2020-10-02 12:38:50 +00:00
|
|
|
, columns_description(std::move(columns_description_))
|
2022-08-06 17:29:33 +00:00
|
|
|
, block_for_format(block_for_format_)
|
2020-01-31 13:12:11 +00:00
|
|
|
, context(context_)
|
|
|
|
, max_block_size(max_block_size_)
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2021-07-24 16:50:03 +00:00
|
|
|
if (!storage->use_table_fd)
|
2016-10-28 17:38:32 +00:00
|
|
|
{
|
2020-09-24 23:29:16 +00:00
|
|
|
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context));
|
|
|
|
if (!shared_lock)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded");
|
2023-05-07 12:18:52 +00:00
|
|
|
storage->readers_counter.fetch_add(1, std::memory_order_release);
|
2016-10-28 17:38:32 +00:00
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2023-05-07 12:18:52 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* If specified option --rename_files_after_processing and files created by TableFunctionFile
|
2023-05-07 17:43:34 +00:00
|
|
|
* Last reader will rename files according to specified pattern if desctuctor of reader was called without uncaught exceptions
|
2023-05-07 12:18:52 +00:00
|
|
|
*/
|
|
|
|
void beforeDestroy()
|
|
|
|
{
|
|
|
|
if (storage->file_renamer.isEmpty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
int32_t cnt = storage->readers_counter.fetch_sub(1, std::memory_order_acq_rel);
|
|
|
|
|
|
|
|
if (std::uncaught_exceptions() == 0 && cnt == 1 && !storage->was_renamed)
|
|
|
|
{
|
|
|
|
shared_lock.unlock();
|
|
|
|
auto exclusive_lock = std::unique_lock{storage->rwlock, getLockTimeout(context)};
|
|
|
|
|
|
|
|
if (!exclusive_lock)
|
|
|
|
return;
|
|
|
|
if (storage->readers_counter.load(std::memory_order_acquire) != 0 || storage->was_renamed)
|
|
|
|
return;
|
|
|
|
|
2023-05-07 17:43:34 +00:00
|
|
|
for (auto & file_path_ref : storage->paths)
|
|
|
|
{
|
2023-05-07 12:18:52 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
auto file_path = fs::path(file_path_ref);
|
|
|
|
String new_filename = storage->file_renamer.generateNewFilename(file_path.filename().string());
|
|
|
|
file_path.replace_filename(new_filename);
|
|
|
|
|
|
|
|
// Normalize new path
|
|
|
|
file_path = file_path.lexically_normal();
|
|
|
|
|
|
|
|
// Checking access rights
|
|
|
|
checkCreationIsAllowed(context, context->getUserFilesPath(), file_path, true);
|
|
|
|
|
|
|
|
// Checking an existing of new file
|
|
|
|
if (fs::exists(file_path))
|
|
|
|
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", file_path.string());
|
|
|
|
|
|
|
|
fs::rename(fs::path(file_path_ref), file_path);
|
|
|
|
file_path_ref = file_path.string();
|
|
|
|
storage->was_renamed = true;
|
|
|
|
}
|
|
|
|
catch (const std::exception & e)
|
|
|
|
{
|
|
|
|
// Cannot throw exception from destructor, will write only error
|
|
|
|
LOG_ERROR(&Poco::Logger::get("~StorageFileSource"), "Failed to rename file {}: {}", file_path_ref, e.what());
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
2016-10-28 17:38:32 +00:00
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2023-05-07 12:18:52 +00:00
|
|
|
~StorageFileSource() override
|
|
|
|
{
|
|
|
|
beforeDestroy();
|
|
|
|
}
|
|
|
|
|
2016-10-18 14:18:37 +00:00
|
|
|
String getName() const override
|
|
|
|
{
|
2019-11-13 12:17:31 +00:00
|
|
|
return storage->getName();
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-01-31 13:12:11 +00:00
|
|
|
Chunk generate() override
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2020-01-31 13:12:11 +00:00
|
|
|
while (!finished_generate)
|
2020-01-27 17:06:32 +00:00
|
|
|
{
|
2020-01-31 13:12:11 +00:00
|
|
|
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.
|
|
|
|
if (!reader)
|
|
|
|
{
|
|
|
|
if (!storage->use_table_fd)
|
|
|
|
{
|
2023-05-25 00:00:32 +00:00
|
|
|
size_t current_file = 0, current_archive = 0;
|
2023-07-31 12:04:27 +00:00
|
|
|
if (!files_info->paths_to_archive.empty())
|
2023-05-29 20:08:18 +00:00
|
|
|
{
|
2023-07-31 12:04:27 +00:00
|
|
|
if (files_info->files.size() != 1)
|
|
|
|
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Can only read a single file from archive");
|
|
|
|
|
2023-05-25 00:00:32 +00:00
|
|
|
current_archive = files_info->next_archive_to_read.fetch_add(1);
|
|
|
|
if (current_archive >= files_info->paths_to_archive.size())
|
|
|
|
return {};
|
|
|
|
current_path = files_info->files[current_file];
|
|
|
|
current_archive_path = files_info->paths_to_archive[current_archive];
|
2023-05-29 20:08:18 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2023-05-25 00:00:32 +00:00
|
|
|
current_file = files_info->next_file_to_read.fetch_add(1);
|
|
|
|
if (current_file >= files_info->files.size())
|
|
|
|
return {};
|
|
|
|
current_path = files_info->files[current_file];
|
|
|
|
}
|
2020-01-31 13:12:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
/// Special case for distributed format. Defaults are not needed here.
|
|
|
|
if (storage->format_name == "Distributed")
|
|
|
|
{
|
2023-01-21 16:01:41 +00:00
|
|
|
pipeline = std::make_unique<QueryPipeline>(std::make_shared<DistributedAsyncInsertSource>(current_path));
|
2021-07-20 18:18:43 +00:00
|
|
|
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
|
2020-01-31 13:12:11 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-05-29 20:08:18 +00:00
|
|
|
if (!read_buf)
|
|
|
|
{
|
2023-07-31 08:50:09 +00:00
|
|
|
struct stat file_stat;
|
|
|
|
if (files_info->paths_to_archive.empty())
|
|
|
|
file_stat = getFileStat(current_path, storage->use_table_fd, storage->table_fd, storage->getName());
|
|
|
|
else
|
|
|
|
file_stat = getFileStat(current_archive_path, storage->use_table_fd, storage->table_fd, storage->getName());
|
|
|
|
|
2023-05-30 19:32:24 +00:00
|
|
|
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
|
|
|
|
continue;
|
2023-07-28 08:49:00 +00:00
|
|
|
|
2023-05-29 20:08:18 +00:00
|
|
|
if (files_info->paths_to_archive.empty())
|
2023-07-28 08:49:00 +00:00
|
|
|
read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, context);
|
2023-05-29 20:08:18 +00:00
|
|
|
else
|
|
|
|
read_buf = createReadBuffer(
|
|
|
|
current_path,
|
2023-07-28 08:49:00 +00:00
|
|
|
file_stat,
|
2023-05-29 20:08:18 +00:00
|
|
|
storage->use_table_fd,
|
|
|
|
storage->table_fd,
|
|
|
|
storage->compression_method,
|
|
|
|
context,
|
|
|
|
current_archive_path);
|
2023-05-25 00:00:32 +00:00
|
|
|
}
|
2021-03-30 17:57:21 +00:00
|
|
|
|
2023-04-25 11:27:20 +00:00
|
|
|
const Settings & settings = context->getSettingsRef();
|
2023-04-30 17:01:06 +00:00
|
|
|
chassert(!storage->paths.empty());
|
2023-05-13 08:22:37 +00:00
|
|
|
const auto max_parsing_threads = std::max<size_t>(settings.max_threads/ storage->paths.size(), 1UL);
|
2023-06-16 15:51:18 +00:00
|
|
|
input_format = context->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size, storage->format_settings, max_parsing_threads);
|
2020-05-18 10:00:22 +00:00
|
|
|
|
2021-09-16 17:40:42 +00:00
|
|
|
QueryPipelineBuilder builder;
|
2023-06-16 15:51:18 +00:00
|
|
|
builder.init(Pipe(input_format));
|
2020-01-31 13:12:11 +00:00
|
|
|
|
2020-10-02 12:38:50 +00:00
|
|
|
if (columns_description.hasDefaults())
|
2021-07-20 18:18:43 +00:00
|
|
|
{
|
2021-09-16 17:40:42 +00:00
|
|
|
builder.addSimpleTransform([&](const Block & header)
|
2021-07-20 18:18:43 +00:00
|
|
|
{
|
2023-06-16 15:51:18 +00:00
|
|
|
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *input_format, context);
|
2021-07-20 18:18:43 +00:00
|
|
|
});
|
|
|
|
}
|
2020-01-31 13:12:11 +00:00
|
|
|
|
2022-05-24 20:06:08 +00:00
|
|
|
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
|
2021-09-16 17:40:42 +00:00
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
|
2020-01-31 13:12:11 +00:00
|
|
|
}
|
2020-01-27 17:06:32 +00:00
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
Chunk chunk;
|
|
|
|
if (reader->pull(chunk))
|
2020-01-15 07:52:45 +00:00
|
|
|
{
|
2021-07-20 18:18:43 +00:00
|
|
|
UInt64 num_rows = chunk.getNumRows();
|
2023-06-29 11:59:09 +00:00
|
|
|
size_t chunk_size = 0;
|
|
|
|
if (storage->format_name != "Distributed")
|
|
|
|
chunk_size = input_format->getApproxBytesReadForChunk();
|
|
|
|
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
|
2020-01-31 13:12:11 +00:00
|
|
|
|
|
|
|
/// Enrich with virtual columns.
|
|
|
|
if (files_info->need_path_column)
|
|
|
|
{
|
2022-02-04 14:13:06 +00:00
|
|
|
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
|
2021-07-20 18:18:43 +00:00
|
|
|
chunk.addColumn(column->convertToFullColumnIfConst());
|
2020-01-31 13:12:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (files_info->need_file_column)
|
|
|
|
{
|
|
|
|
size_t last_slash_pos = current_path.find_last_of('/');
|
|
|
|
auto file_name = current_path.substr(last_slash_pos + 1);
|
|
|
|
|
2022-02-04 14:13:06 +00:00
|
|
|
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
|
2021-07-20 18:18:43 +00:00
|
|
|
chunk.addColumn(column->convertToFullColumnIfConst());
|
2020-01-31 13:12:11 +00:00
|
|
|
}
|
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
return chunk;
|
2020-01-15 07:52:45 +00:00
|
|
|
}
|
2020-01-27 17:06:32 +00:00
|
|
|
|
2020-01-31 13:12:11 +00:00
|
|
|
/// Read only once for file descriptor.
|
|
|
|
if (storage->use_table_fd)
|
|
|
|
finished_generate = true;
|
|
|
|
|
|
|
|
/// Close file prematurely if stream was ended.
|
2020-01-27 17:06:32 +00:00
|
|
|
reader.reset();
|
2021-07-20 18:18:43 +00:00
|
|
|
pipeline.reset();
|
2023-06-19 13:36:29 +00:00
|
|
|
input_format.reset();
|
2020-01-27 17:06:32 +00:00
|
|
|
read_buf.reset();
|
|
|
|
}
|
|
|
|
|
2020-01-31 13:12:11 +00:00
|
|
|
return {};
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
|
|
|
|
2021-04-26 13:34:44 +00:00
|
|
|
|
2016-10-18 14:18:37 +00:00
|
|
|
private:
|
2019-11-13 12:17:31 +00:00
|
|
|
std::shared_ptr<StorageFile> storage;
|
2021-07-09 03:15:41 +00:00
|
|
|
StorageSnapshotPtr storage_snapshot;
|
2020-01-31 13:12:11 +00:00
|
|
|
FilesInfoPtr files_info;
|
|
|
|
String current_path;
|
2023-05-25 00:00:32 +00:00
|
|
|
String current_archive_path;
|
2016-10-18 14:18:37 +00:00
|
|
|
Block sample_block;
|
2019-11-19 12:46:07 +00:00
|
|
|
std::unique_ptr<ReadBuffer> read_buf;
|
2023-06-16 15:51:18 +00:00
|
|
|
InputFormatPtr input_format;
|
2021-07-20 18:18:43 +00:00
|
|
|
std::unique_ptr<QueryPipeline> pipeline;
|
|
|
|
std::unique_ptr<PullingPipelineExecutor> reader;
|
2019-01-27 00:38:30 +00:00
|
|
|
|
2020-10-02 12:38:50 +00:00
|
|
|
ColumnsDescription columns_description;
|
2022-08-06 17:29:33 +00:00
|
|
|
Block block_for_format;
|
2020-01-31 13:12:11 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context; /// TODO Untangle potential issues with context lifetime.
|
2020-01-27 18:13:02 +00:00
|
|
|
UInt64 max_block_size;
|
2020-01-31 13:12:11 +00:00
|
|
|
|
|
|
|
bool finished_generate = false;
|
2020-01-27 18:13:02 +00:00
|
|
|
|
2020-09-24 23:29:16 +00:00
|
|
|
std::shared_lock<std::shared_timed_mutex> shared_lock;
|
2016-10-18 14:18:37 +00:00
|
|
|
};
|
|
|
|
|
2021-07-24 16:50:03 +00:00
|
|
|
|
2020-08-03 13:54:14 +00:00
|
|
|
Pipe StorageFile::read(
|
2020-01-15 07:52:45 +00:00
|
|
|
const Names & column_names,
|
2021-07-09 03:15:41 +00:00
|
|
|
const StorageSnapshotPtr & storage_snapshot,
|
2020-09-20 17:52:17 +00:00
|
|
|
SelectQueryInfo & /*query_info*/,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context,
|
2018-09-08 11:29:23 +00:00
|
|
|
QueryProcessingStage::Enum /*processed_stage*/,
|
2019-02-18 23:38:44 +00:00
|
|
|
size_t max_block_size,
|
2023-04-06 21:57:03 +00:00
|
|
|
const size_t max_num_streams)
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2022-05-09 19:13:02 +00:00
|
|
|
if (use_table_fd)
|
|
|
|
{
|
2019-09-06 18:29:41 +00:00
|
|
|
paths = {""}; /// when use fd, paths are empty
|
2022-05-09 19:13:02 +00:00
|
|
|
}
|
2019-12-17 08:06:39 +00:00
|
|
|
else
|
2021-07-24 16:50:03 +00:00
|
|
|
{
|
2023-05-25 00:00:32 +00:00
|
|
|
if (paths.size() == 1 && paths_to_archive.empty() && !fs::exists(paths[0]))
|
2021-02-16 14:50:11 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context->getSettingsRef().engine_file_empty_if_not_exists)
|
2021-07-09 03:15:41 +00:00
|
|
|
return Pipe(std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
|
2021-02-16 14:50:11 +00:00
|
|
|
else
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", paths[0]);
|
2021-02-16 14:50:11 +00:00
|
|
|
}
|
2021-07-24 16:50:03 +00:00
|
|
|
}
|
2020-01-31 13:12:11 +00:00
|
|
|
|
|
|
|
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
|
|
|
|
files_info->files = paths;
|
2023-05-25 00:00:32 +00:00
|
|
|
files_info->paths_to_archive = paths_to_archive;
|
2022-05-06 15:04:03 +00:00
|
|
|
files_info->total_bytes_to_read = total_bytes_to_read;
|
2020-01-31 13:12:11 +00:00
|
|
|
|
2020-01-15 07:52:45 +00:00
|
|
|
for (const auto & column : column_names)
|
|
|
|
{
|
|
|
|
if (column == "_path")
|
2020-01-31 13:12:11 +00:00
|
|
|
files_info->need_path_column = true;
|
2020-01-15 07:52:45 +00:00
|
|
|
if (column == "_file")
|
2020-01-31 13:12:11 +00:00
|
|
|
files_info->need_file_column = true;
|
2020-01-15 07:52:45 +00:00
|
|
|
}
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2020-01-31 13:12:11 +00:00
|
|
|
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2023-04-06 21:57:03 +00:00
|
|
|
size_t num_streams = max_num_streams;
|
|
|
|
if (max_num_streams > paths.size())
|
2020-01-31 13:12:11 +00:00
|
|
|
num_streams = paths.size();
|
|
|
|
|
|
|
|
Pipes pipes;
|
|
|
|
pipes.reserve(num_streams);
|
|
|
|
|
2021-05-13 22:56:42 +00:00
|
|
|
/// Set total number of bytes to process. For progress bar.
|
|
|
|
auto progress_callback = context->getFileProgressCallback();
|
2022-05-06 15:04:03 +00:00
|
|
|
|
|
|
|
if (progress_callback)
|
2021-05-13 22:56:42 +00:00
|
|
|
progress_callback(FileProgress(0, total_bytes_to_read));
|
2021-04-26 13:34:44 +00:00
|
|
|
|
2020-01-31 13:12:11 +00:00
|
|
|
for (size_t i = 0; i < num_streams; ++i)
|
2020-11-02 07:50:38 +00:00
|
|
|
{
|
2022-08-06 17:29:33 +00:00
|
|
|
ColumnsDescription columns_description;
|
|
|
|
Block block_for_format;
|
|
|
|
if (supportsSubsetOfColumns())
|
2021-03-30 17:57:21 +00:00
|
|
|
{
|
2022-08-06 17:29:33 +00:00
|
|
|
auto fetch_columns = column_names;
|
|
|
|
const auto & virtuals = getVirtuals();
|
|
|
|
std::erase_if(
|
|
|
|
fetch_columns,
|
|
|
|
[&](const String & col)
|
|
|
|
{
|
|
|
|
return std::any_of(
|
|
|
|
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
|
|
|
|
});
|
|
|
|
|
|
|
|
if (fetch_columns.empty())
|
2023-01-09 10:34:47 +00:00
|
|
|
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
|
2022-08-06 17:29:33 +00:00
|
|
|
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
columns_description = storage_snapshot->metadata->getColumns();
|
|
|
|
}
|
2022-08-08 09:23:57 +00:00
|
|
|
|
2022-08-07 12:17:42 +00:00
|
|
|
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
|
2021-07-24 16:50:03 +00:00
|
|
|
|
2022-01-31 08:58:40 +00:00
|
|
|
/// In case of reading from fd we have to check whether we have already created
|
|
|
|
/// the read buffer from it in Storage constructor (for schema inference) or not.
|
|
|
|
/// If yes, then we should use it in StorageFileSource. Atomic bool flag is needed
|
|
|
|
/// to prevent data race in case of parallel reads.
|
|
|
|
std::unique_ptr<ReadBuffer> read_buffer;
|
|
|
|
if (has_peekable_read_buffer_from_fd.exchange(false))
|
|
|
|
read_buffer = std::move(peekable_read_buffer_from_fd);
|
|
|
|
|
2020-03-23 02:12:31 +00:00
|
|
|
pipes.emplace_back(std::make_shared<StorageFileSource>(
|
2022-08-06 17:29:33 +00:00
|
|
|
this_ptr,
|
|
|
|
storage_snapshot,
|
|
|
|
context,
|
|
|
|
max_block_size,
|
|
|
|
files_info,
|
|
|
|
columns_description,
|
|
|
|
block_for_format,
|
|
|
|
std::move(read_buffer)));
|
2020-11-02 07:50:38 +00:00
|
|
|
}
|
2020-01-31 13:12:11 +00:00
|
|
|
|
2020-08-06 12:24:05 +00:00
|
|
|
return Pipe::unitePipes(std::move(pipes));
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-26 14:47:29 +00:00
|
|
|
class StorageFileSink final : public SinkToStorage
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
|
|
|
public:
|
2021-10-25 16:23:44 +00:00
|
|
|
StorageFileSink(
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2021-10-28 12:44:12 +00:00
|
|
|
const String & table_name_for_log_,
|
|
|
|
int table_fd_,
|
|
|
|
bool use_table_fd_,
|
|
|
|
std::string base_path_,
|
2022-01-14 18:18:16 +00:00
|
|
|
std::string path_,
|
2021-10-25 16:23:44 +00:00
|
|
|
const CompressionMethod compression_method_,
|
|
|
|
const std::optional<FormatSettings> & format_settings_,
|
2021-10-28 12:44:12 +00:00
|
|
|
const String format_name_,
|
|
|
|
ContextPtr context_,
|
2021-10-25 16:23:44 +00:00
|
|
|
int flags_)
|
|
|
|
: SinkToStorage(metadata_snapshot_->getSampleBlock())
|
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, table_name_for_log(table_name_for_log_)
|
|
|
|
, table_fd(table_fd_)
|
|
|
|
, use_table_fd(use_table_fd_)
|
|
|
|
, base_path(base_path_)
|
2022-01-14 18:18:16 +00:00
|
|
|
, path(path_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, compression_method(compression_method_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, format_name(format_name_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, format_settings(format_settings_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, context(context_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, flags(flags_)
|
|
|
|
{
|
|
|
|
initialize();
|
|
|
|
}
|
|
|
|
|
|
|
|
StorageFileSink(
|
2020-06-16 15:51:29 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2021-10-28 12:44:12 +00:00
|
|
|
const String & table_name_for_log_,
|
2020-09-24 23:29:16 +00:00
|
|
|
std::unique_lock<std::shared_timed_mutex> && lock_,
|
2021-10-28 12:44:12 +00:00
|
|
|
int table_fd_,
|
|
|
|
bool use_table_fd_,
|
|
|
|
std::string base_path_,
|
2022-01-14 18:18:16 +00:00
|
|
|
const std::string & path_,
|
2021-10-25 16:23:44 +00:00
|
|
|
const CompressionMethod compression_method_,
|
|
|
|
const std::optional<FormatSettings> & format_settings_,
|
2021-10-28 12:44:12 +00:00
|
|
|
const String format_name_,
|
|
|
|
ContextPtr context_,
|
2021-10-25 16:23:44 +00:00
|
|
|
int flags_)
|
2021-07-26 10:08:40 +00:00
|
|
|
: SinkToStorage(metadata_snapshot_->getSampleBlock())
|
2020-06-16 15:51:29 +00:00
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, table_name_for_log(table_name_for_log_)
|
|
|
|
, table_fd(table_fd_)
|
|
|
|
, use_table_fd(use_table_fd_)
|
|
|
|
, base_path(base_path_)
|
2022-01-14 18:18:16 +00:00
|
|
|
, path(path_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, compression_method(compression_method_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, format_name(format_name_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, format_settings(format_settings_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, context(context_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, flags(flags_)
|
2020-09-24 23:29:16 +00:00
|
|
|
, lock(std::move(lock_))
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2020-09-24 23:29:16 +00:00
|
|
|
if (!lock)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded");
|
2021-10-25 16:23:44 +00:00
|
|
|
initialize();
|
|
|
|
}
|
2020-09-24 23:29:16 +00:00
|
|
|
|
2021-10-25 16:23:44 +00:00
|
|
|
void initialize()
|
|
|
|
{
|
2020-07-07 11:45:20 +00:00
|
|
|
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
|
2021-10-28 12:44:12 +00:00
|
|
|
if (use_table_fd)
|
2016-10-28 17:38:32 +00:00
|
|
|
{
|
2021-10-28 12:44:12 +00:00
|
|
|
naked_buffer = std::make_unique<WriteBufferFromFileDescriptor>(table_fd, DBMS_DEFAULT_BUFFER_SIZE);
|
2016-10-28 17:38:32 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-02-19 23:27:23 +00:00
|
|
|
flags |= O_WRONLY | O_APPEND | O_CREAT;
|
2022-01-14 18:18:16 +00:00
|
|
|
naked_buffer = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, flags);
|
2016-10-28 17:38:32 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-11-02 13:40:41 +00:00
|
|
|
/// In case of formats with prefixes if file is not empty we have already written prefix.
|
|
|
|
bool do_not_write_prefix = naked_buffer->size();
|
2020-07-07 11:45:20 +00:00
|
|
|
|
|
|
|
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
|
|
|
|
|
2021-10-28 12:44:12 +00:00
|
|
|
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name,
|
2022-10-28 16:41:10 +00:00
|
|
|
*write_buf, metadata_snapshot->getSampleBlock(), context, format_settings);
|
2021-11-02 13:40:41 +00:00
|
|
|
|
|
|
|
if (do_not_write_prefix)
|
|
|
|
writer->doNotWritePrefix();
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-07-23 19:33:59 +00:00
|
|
|
String getName() const override { return "StorageFileSink"; }
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-07-26 14:47:29 +00:00
|
|
|
void consume(Chunk chunk) override
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2022-07-21 12:18:37 +00:00
|
|
|
std::lock_guard cancel_lock(cancel_mutex);
|
|
|
|
if (cancelled)
|
|
|
|
return;
|
2021-09-03 17:29:36 +00:00
|
|
|
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2022-07-21 12:18:37 +00:00
|
|
|
void onCancel() override
|
|
|
|
{
|
|
|
|
std::lock_guard cancel_lock(cancel_mutex);
|
|
|
|
finalize();
|
|
|
|
cancelled = true;
|
|
|
|
}
|
|
|
|
|
2023-06-22 12:33:25 +00:00
|
|
|
void onException(std::exception_ptr exception) override
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2022-07-21 12:18:37 +00:00
|
|
|
std::lock_guard cancel_lock(cancel_mutex);
|
2023-06-22 12:33:25 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
std::rethrow_exception(exception);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
/// An exception context is needed to proper delete write buffers without finalization
|
|
|
|
release();
|
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
|
|
|
|
2022-05-06 17:30:18 +00:00
|
|
|
void onFinish() override
|
|
|
|
{
|
2022-07-21 12:18:37 +00:00
|
|
|
std::lock_guard cancel_lock(cancel_mutex);
|
|
|
|
finalize();
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
void finalize()
|
|
|
|
{
|
|
|
|
if (!writer)
|
|
|
|
return;
|
|
|
|
|
2022-05-06 17:30:18 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
writer->finalize();
|
|
|
|
writer->flush();
|
|
|
|
write_buf->finalize();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
/// Stop ParallelFormattingOutputFormat correctly.
|
2023-06-22 12:33:25 +00:00
|
|
|
release();
|
2022-05-06 17:30:18 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-22 12:33:25 +00:00
|
|
|
void release()
|
|
|
|
{
|
|
|
|
writer.reset();
|
|
|
|
write_buf->finalize();
|
|
|
|
}
|
|
|
|
|
2020-06-16 15:51:29 +00:00
|
|
|
StorageMetadataPtr metadata_snapshot;
|
2021-10-28 12:44:12 +00:00
|
|
|
String table_name_for_log;
|
2021-10-25 16:23:44 +00:00
|
|
|
|
2019-11-19 12:46:07 +00:00
|
|
|
std::unique_ptr<WriteBuffer> write_buf;
|
2021-10-11 16:11:50 +00:00
|
|
|
OutputFormatPtr writer;
|
2021-10-28 12:44:12 +00:00
|
|
|
|
|
|
|
int table_fd;
|
|
|
|
bool use_table_fd;
|
|
|
|
std::string base_path;
|
2022-01-14 18:18:16 +00:00
|
|
|
std::string path;
|
2021-10-28 12:44:12 +00:00
|
|
|
CompressionMethod compression_method;
|
|
|
|
std::string format_name;
|
|
|
|
std::optional<FormatSettings> format_settings;
|
|
|
|
|
|
|
|
ContextPtr context;
|
|
|
|
int flags;
|
|
|
|
std::unique_lock<std::shared_timed_mutex> lock;
|
2022-07-21 12:18:37 +00:00
|
|
|
|
|
|
|
std::mutex cancel_mutex;
|
|
|
|
bool cancelled = false;
|
2016-10-18 14:18:37 +00:00
|
|
|
};
|
|
|
|
|
2021-10-25 16:23:44 +00:00
|
|
|
class PartitionedStorageFileSink : public PartitionedSink
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
PartitionedStorageFileSink(
|
|
|
|
const ASTPtr & partition_by,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2021-10-28 12:44:12 +00:00
|
|
|
const String & table_name_for_log_,
|
2021-10-25 16:23:44 +00:00
|
|
|
std::unique_lock<std::shared_timed_mutex> && lock_,
|
2021-10-28 13:56:45 +00:00
|
|
|
String base_path_,
|
|
|
|
String path_,
|
2021-10-25 16:23:44 +00:00
|
|
|
const CompressionMethod compression_method_,
|
|
|
|
const std::optional<FormatSettings> & format_settings_,
|
2021-10-28 12:44:12 +00:00
|
|
|
const String format_name_,
|
|
|
|
ContextPtr context_,
|
|
|
|
int flags_)
|
2021-10-25 16:23:44 +00:00
|
|
|
: PartitionedSink(partition_by, context_, metadata_snapshot_->getSampleBlock())
|
2021-10-28 13:56:45 +00:00
|
|
|
, path(path_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, table_name_for_log(table_name_for_log_)
|
|
|
|
, base_path(base_path_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, compression_method(compression_method_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, format_name(format_name_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, format_settings(format_settings_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, context(context_)
|
2021-10-25 16:23:44 +00:00
|
|
|
, flags(flags_)
|
2021-10-28 12:44:12 +00:00
|
|
|
, lock(std::move(lock_))
|
2021-10-25 16:23:44 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
SinkPtr createSinkForPartition(const String & partition_id) override
|
|
|
|
{
|
|
|
|
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
|
|
|
|
PartitionedSink::validatePartitionKey(partition_path, true);
|
2022-04-25 21:44:43 +00:00
|
|
|
checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path, /*can_be_directory=*/ true);
|
2021-10-25 16:23:44 +00:00
|
|
|
return std::make_shared<StorageFileSink>(
|
2021-10-28 12:44:12 +00:00
|
|
|
metadata_snapshot,
|
|
|
|
table_name_for_log,
|
2021-10-28 13:56:45 +00:00
|
|
|
-1,
|
|
|
|
/* use_table_fd */false,
|
2021-10-28 12:44:12 +00:00
|
|
|
base_path,
|
2022-01-14 18:18:16 +00:00
|
|
|
partition_path,
|
2021-10-28 12:44:12 +00:00
|
|
|
compression_method,
|
|
|
|
format_settings,
|
|
|
|
format_name,
|
|
|
|
context,
|
|
|
|
flags);
|
2021-10-25 16:23:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
const String path;
|
|
|
|
StorageMetadataPtr metadata_snapshot;
|
2021-10-28 12:44:12 +00:00
|
|
|
String table_name_for_log;
|
|
|
|
|
|
|
|
std::string base_path;
|
|
|
|
CompressionMethod compression_method;
|
|
|
|
std::string format_name;
|
|
|
|
std::optional<FormatSettings> format_settings;
|
2021-10-25 16:23:44 +00:00
|
|
|
|
|
|
|
ContextPtr context;
|
|
|
|
int flags;
|
2021-10-28 12:44:12 +00:00
|
|
|
std::unique_lock<std::shared_timed_mutex> lock;
|
2016-10-18 14:18:37 +00:00
|
|
|
};
|
|
|
|
|
2021-10-25 16:23:44 +00:00
|
|
|
|
2021-07-23 14:25:35 +00:00
|
|
|
SinkToStoragePtr StorageFile::write(
|
2021-10-25 16:23:44 +00:00
|
|
|
const ASTPtr & query,
|
2020-06-16 15:51:29 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2023-06-07 18:33:08 +00:00
|
|
|
ContextPtr context,
|
|
|
|
bool /*async_insert*/)
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
2020-01-04 14:45:11 +00:00
|
|
|
if (format_name == "Distributed")
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not implemented for Distributed format");
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2021-02-19 23:27:23 +00:00
|
|
|
int flags = 0;
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context->getSettingsRef().engine_file_truncate_on_insert)
|
2021-02-19 23:27:23 +00:00
|
|
|
flags |= O_TRUNC;
|
2021-02-16 14:50:11 +00:00
|
|
|
|
2021-10-28 13:56:45 +00:00
|
|
|
bool has_wildcards = path_for_partitioned_write.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
|
2021-10-25 16:23:44 +00:00
|
|
|
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
|
|
|
|
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
|
|
|
|
|
|
|
|
if (is_partitioned_implementation)
|
2020-12-08 14:13:35 +00:00
|
|
|
{
|
2021-10-28 13:56:45 +00:00
|
|
|
if (path_for_partitioned_write.empty())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty path for partitioned write");
|
2022-02-12 16:05:35 +00:00
|
|
|
|
2021-10-28 13:56:45 +00:00
|
|
|
fs::create_directories(fs::path(path_for_partitioned_write).parent_path());
|
2021-10-25 16:23:44 +00:00
|
|
|
|
|
|
|
return std::make_shared<PartitionedStorageFileSink>(
|
|
|
|
insert_query->partition_by,
|
|
|
|
metadata_snapshot,
|
2021-10-28 12:44:12 +00:00
|
|
|
getStorageID().getNameForLogs(),
|
2021-10-25 16:23:44 +00:00
|
|
|
std::unique_lock{rwlock, getLockTimeout(context)},
|
2021-10-28 12:44:12 +00:00
|
|
|
base_path,
|
2021-10-28 13:56:45 +00:00
|
|
|
path_for_partitioned_write,
|
2022-01-14 18:18:16 +00:00
|
|
|
chooseCompressionMethod(path_for_partitioned_write, compression_method),
|
2021-10-25 16:23:44 +00:00
|
|
|
format_settings,
|
2021-10-28 12:44:12 +00:00
|
|
|
format_name,
|
|
|
|
context,
|
2021-10-25 16:23:44 +00:00
|
|
|
flags);
|
2020-12-08 14:13:35 +00:00
|
|
|
}
|
2021-10-25 16:23:44 +00:00
|
|
|
else
|
|
|
|
{
|
2022-01-14 18:18:16 +00:00
|
|
|
String path;
|
2021-10-28 13:56:45 +00:00
|
|
|
if (!paths.empty())
|
|
|
|
{
|
2021-12-29 18:03:15 +00:00
|
|
|
if (is_path_with_globs)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
|
|
|
|
"Table '{}' is in readonly mode because of globs in filepath",
|
|
|
|
getStorageID().getNameForLogs());
|
2021-12-29 18:03:15 +00:00
|
|
|
|
2022-01-14 18:18:16 +00:00
|
|
|
path = paths.back();
|
2021-10-28 13:56:45 +00:00
|
|
|
fs::create_directories(fs::path(path).parent_path());
|
2022-01-14 18:18:16 +00:00
|
|
|
|
2022-02-10 06:23:27 +00:00
|
|
|
std::error_code error_code;
|
2021-12-29 18:03:15 +00:00
|
|
|
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
|
2022-04-25 21:44:43 +00:00
|
|
|
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings)
|
2022-02-10 06:23:27 +00:00
|
|
|
&& fs::file_size(paths.back(), error_code) != 0 && !error_code)
|
2021-12-29 18:03:15 +00:00
|
|
|
{
|
|
|
|
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
|
|
|
|
{
|
|
|
|
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
|
|
|
|
size_t index = paths.size();
|
|
|
|
String new_path;
|
|
|
|
do
|
|
|
|
{
|
|
|
|
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
|
|
|
|
++index;
|
|
|
|
}
|
|
|
|
while (fs::exists(new_path));
|
|
|
|
paths.push_back(new_path);
|
2022-01-14 18:18:16 +00:00
|
|
|
path = new_path;
|
2021-12-29 18:03:15 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::CANNOT_APPEND_TO_FILE,
|
2022-01-14 16:17:06 +00:00
|
|
|
"Cannot append data in format {} to file, because this format doesn't support appends."
|
|
|
|
" You can allow to create a new file "
|
2021-12-29 18:03:15 +00:00
|
|
|
"on each insert by enabling setting engine_file_allow_create_multiple_files",
|
|
|
|
format_name);
|
|
|
|
}
|
2021-10-28 13:56:45 +00:00
|
|
|
}
|
2020-09-16 19:58:27 +00:00
|
|
|
|
2021-10-25 16:23:44 +00:00
|
|
|
return std::make_shared<StorageFileSink>(
|
|
|
|
metadata_snapshot,
|
2021-10-28 12:44:12 +00:00
|
|
|
getStorageID().getNameForLogs(),
|
2021-10-25 16:23:44 +00:00
|
|
|
std::unique_lock{rwlock, getLockTimeout(context)},
|
2021-10-28 12:44:12 +00:00
|
|
|
table_fd,
|
|
|
|
use_table_fd,
|
|
|
|
base_path,
|
2022-01-14 18:18:16 +00:00
|
|
|
path,
|
2021-10-25 16:23:44 +00:00
|
|
|
chooseCompressionMethod(path, compression_method),
|
|
|
|
format_settings,
|
2021-10-28 12:44:12 +00:00
|
|
|
format_name,
|
|
|
|
context,
|
2021-10-25 16:23:44 +00:00
|
|
|
flags);
|
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
2020-09-16 19:58:27 +00:00
|
|
|
|
2020-11-01 17:38:43 +00:00
|
|
|
bool StorageFile::storesDataOnDisk() const
|
|
|
|
{
|
|
|
|
return is_db_table;
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
|
|
|
|
2019-09-06 08:53:32 +00:00
|
|
|
Strings StorageFile::getDataPaths() const
|
2019-09-04 19:55:56 +00:00
|
|
|
{
|
2019-09-05 18:09:19 +00:00
|
|
|
if (paths.empty())
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Table '{}' is in readonly mode", getStorageID().getNameForLogs());
|
2019-09-06 08:53:32 +00:00
|
|
|
return paths;
|
2019-09-04 19:55:56 +00:00
|
|
|
}
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2020-04-07 14:05:51 +00:00
|
|
|
void StorageFile::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
|
2016-10-18 14:18:37 +00:00
|
|
|
{
|
|
|
|
if (!is_db_table)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
|
|
|
|
"Can't rename table {} bounded to user-defined file (or FD)", getStorageID().getNameForLogs());
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2019-09-04 11:11:30 +00:00
|
|
|
if (paths.size() != 1)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Can't rename table {} in readonly mode", getStorageID().getNameForLogs());
|
2019-09-04 11:11:30 +00:00
|
|
|
|
2020-09-17 19:50:43 +00:00
|
|
|
std::string path_new = getTablePath(base_path + new_path_to_table_data, format_name);
|
|
|
|
if (path_new == paths[0])
|
|
|
|
return;
|
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::create_directories(fs::path(path_new).parent_path());
|
|
|
|
fs::rename(paths[0], path_new);
|
2016-10-18 14:18:37 +00:00
|
|
|
|
2019-09-04 11:11:30 +00:00
|
|
|
paths[0] = std::move(path_new);
|
2020-04-07 14:05:51 +00:00
|
|
|
renameInMemory(new_table_id);
|
2016-10-18 14:18:37 +00:00
|
|
|
}
|
|
|
|
|
2020-06-18 10:29:13 +00:00
|
|
|
void StorageFile::truncate(
|
|
|
|
const ASTPtr & /*query*/,
|
|
|
|
const StorageMetadataPtr & /* metadata_snapshot */,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr /* context */,
|
2020-06-18 16:10:47 +00:00
|
|
|
TableExclusiveLockHolder &)
|
2020-01-05 02:57:09 +00:00
|
|
|
{
|
2021-12-29 18:03:15 +00:00
|
|
|
if (is_path_with_globs)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Can't truncate table '{}' in readonly mode", getStorageID().getNameForLogs());
|
2020-01-05 02:57:09 +00:00
|
|
|
|
|
|
|
if (use_table_fd)
|
|
|
|
{
|
|
|
|
if (0 != ::ftruncate(table_fd, 0))
|
|
|
|
throwFromErrno("Cannot truncate file at fd " + toString(table_fd), ErrorCodes::CANNOT_TRUNCATE_FILE);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-12-29 18:03:15 +00:00
|
|
|
for (const auto & path : paths)
|
|
|
|
{
|
|
|
|
if (!fs::exists(path))
|
|
|
|
continue;
|
2020-01-05 20:11:26 +00:00
|
|
|
|
2021-12-29 18:03:15 +00:00
|
|
|
if (0 != ::truncate(path.c_str(), 0))
|
|
|
|
throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
|
|
|
|
}
|
2020-01-05 02:57:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-12-30 00:36:06 +00:00
|
|
|
|
|
|
|
void registerStorageFile(StorageFactory & factory)
|
|
|
|
{
|
2020-11-02 07:50:38 +00:00
|
|
|
StorageFactory::StorageFeatures storage_features{
|
|
|
|
.supports_settings = true,
|
2021-12-15 11:30:57 +00:00
|
|
|
.supports_schema_inference = true,
|
|
|
|
.source_access_type = AccessType::FILE,
|
2020-11-02 07:50:38 +00:00
|
|
|
};
|
|
|
|
|
2020-04-06 05:19:40 +00:00
|
|
|
factory.registerStorage(
|
|
|
|
"File",
|
2020-11-02 07:50:38 +00:00
|
|
|
[](const StorageFactory::Arguments & factory_args)
|
2020-04-06 05:19:40 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
StorageFile::CommonArguments storage_args
|
|
|
|
{
|
|
|
|
WithContext(factory_args.getContext()),
|
|
|
|
factory_args.table_id,
|
|
|
|
{},
|
|
|
|
{},
|
|
|
|
{},
|
|
|
|
factory_args.columns,
|
|
|
|
factory_args.constraints,
|
2021-04-23 12:18:23 +00:00
|
|
|
factory_args.comment,
|
2023-05-07 12:18:52 +00:00
|
|
|
{},
|
2023-07-31 12:04:27 +00:00
|
|
|
{},
|
2020-11-02 07:50:38 +00:00
|
|
|
};
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
ASTs & engine_args_ast = factory_args.engine_args;
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
if (!(engine_args_ast.size() >= 1 && engine_args_ast.size() <= 3)) // NOLINT
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
|
|
|
"Storage File requires from 1 to 3 arguments: "
|
|
|
|
"name of used format, source and compression_method.");
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.getLocalContext());
|
2022-06-23 20:04:06 +00:00
|
|
|
storage_args.format_name = checkAndGetLiteralArgument<String>(engine_args_ast[0], "format_name");
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2020-11-05 11:28:20 +00:00
|
|
|
// Use format settings from global server context + settings from
|
|
|
|
// the SETTINGS clause of the create query. Settings from current
|
|
|
|
// session and user are ignored.
|
2020-11-02 07:50:38 +00:00
|
|
|
if (factory_args.storage_def->settings)
|
|
|
|
{
|
2020-11-07 08:53:39 +00:00
|
|
|
FormatFactorySettings user_format_settings;
|
2019-10-30 14:17:55 +00:00
|
|
|
|
2020-11-07 08:53:39 +00:00
|
|
|
// Apply changed settings from global context, but ignore the
|
|
|
|
// unknown ones, because we only have the format settings here.
|
2021-04-10 23:33:54 +00:00
|
|
|
const auto & changes = factory_args.getContext()->getSettingsRef().changes();
|
2020-11-07 08:53:39 +00:00
|
|
|
for (const auto & change : changes)
|
|
|
|
{
|
|
|
|
if (user_format_settings.has(change.name))
|
|
|
|
{
|
|
|
|
user_format_settings.set(change.name, change.value);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Apply changes from SETTINGS clause, with validation.
|
|
|
|
user_format_settings.applyChanges(
|
2020-11-02 07:50:38 +00:00
|
|
|
factory_args.storage_def->settings->changes);
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
storage_args.format_settings = getFormatSettings(
|
2021-04-10 23:33:54 +00:00
|
|
|
factory_args.getContext(), user_format_settings);
|
2020-11-02 07:50:38 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
storage_args.format_settings = getFormatSettings(
|
2021-04-10 23:33:54 +00:00
|
|
|
factory_args.getContext());
|
2020-11-02 07:50:38 +00:00
|
|
|
}
|
2019-10-30 14:17:55 +00:00
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
if (engine_args_ast.size() == 1) /// Table in database
|
2022-04-19 20:47:29 +00:00
|
|
|
return std::make_shared<StorageFile>(factory_args.relative_data_path, storage_args);
|
2019-10-30 14:17:55 +00:00
|
|
|
|
2020-04-06 05:19:40 +00:00
|
|
|
/// Will use FD if engine_args[1] is int literal or identifier with std* name
|
|
|
|
int source_fd = -1;
|
|
|
|
String source_path;
|
2019-11-25 13:01:16 +00:00
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
if (auto opt_name = tryGetIdentifierName(engine_args_ast[1]))
|
2020-04-06 05:19:40 +00:00
|
|
|
{
|
|
|
|
if (*opt_name == "stdin")
|
|
|
|
source_fd = STDIN_FILENO;
|
|
|
|
else if (*opt_name == "stdout")
|
|
|
|
source_fd = STDOUT_FILENO;
|
|
|
|
else if (*opt_name == "stderr")
|
|
|
|
source_fd = STDERR_FILENO;
|
|
|
|
else
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown identifier '{}' in second arg of File storage constructor",
|
|
|
|
*opt_name);
|
2020-04-06 05:19:40 +00:00
|
|
|
}
|
2020-11-02 07:50:38 +00:00
|
|
|
else if (const auto * literal = engine_args_ast[1]->as<ASTLiteral>())
|
2020-04-06 05:19:40 +00:00
|
|
|
{
|
|
|
|
auto type = literal->value.getType();
|
|
|
|
if (type == Field::Types::Int64)
|
|
|
|
source_fd = static_cast<int>(literal->value.get<Int64>());
|
|
|
|
else if (type == Field::Types::UInt64)
|
|
|
|
source_fd = static_cast<int>(literal->value.get<UInt64>());
|
|
|
|
else if (type == Field::Types::String)
|
2023-07-31 12:04:27 +00:00
|
|
|
StorageFile::parseFileSource(literal->value.get<String>(), source_path, storage_args.path_to_archive);
|
2020-04-06 05:19:40 +00:00
|
|
|
else
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second argument must be path or file descriptor");
|
2020-04-06 05:19:40 +00:00
|
|
|
}
|
|
|
|
|
2020-11-02 07:50:38 +00:00
|
|
|
if (engine_args_ast.size() == 3)
|
2020-04-06 05:19:40 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
engine_args_ast[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[2], factory_args.getLocalContext());
|
2022-06-23 20:04:06 +00:00
|
|
|
storage_args.compression_method = checkAndGetLiteralArgument<String>(engine_args_ast[2], "compression_method");
|
2020-04-06 05:19:40 +00:00
|
|
|
}
|
2019-10-30 14:17:55 +00:00
|
|
|
else
|
2020-11-02 07:50:38 +00:00
|
|
|
storage_args.compression_method = "auto";
|
2017-12-30 00:36:06 +00:00
|
|
|
|
2020-04-06 05:19:40 +00:00
|
|
|
if (0 <= source_fd) /// File descriptor
|
2022-04-19 20:47:29 +00:00
|
|
|
return std::make_shared<StorageFile>(source_fd, storage_args);
|
2020-04-06 05:19:40 +00:00
|
|
|
else /// User's file
|
2022-04-19 20:47:29 +00:00
|
|
|
return std::make_shared<StorageFile>(source_path, factory_args.getContext()->getUserFilesPath(), storage_args);
|
2020-04-06 05:19:40 +00:00
|
|
|
},
|
2020-11-02 07:50:38 +00:00
|
|
|
storage_features);
|
2017-12-30 00:36:06 +00:00
|
|
|
}
|
2020-11-02 07:50:38 +00:00
|
|
|
|
|
|
|
|
2020-04-28 10:38:57 +00:00
|
|
|
NamesAndTypesList StorageFile::getVirtuals() const
|
2020-04-27 13:55:30 +00:00
|
|
|
{
|
2020-04-28 10:38:57 +00:00
|
|
|
return NamesAndTypesList{
|
2022-02-04 14:13:06 +00:00
|
|
|
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
|
|
|
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
2020-04-27 13:55:30 +00:00
|
|
|
}
|
2022-06-21 13:02:48 +00:00
|
|
|
|
2022-08-05 16:20:15 +00:00
|
|
|
SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
|
2022-06-21 13:02:48 +00:00
|
|
|
{
|
2022-08-05 16:20:15 +00:00
|
|
|
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS));
|
2022-06-21 13:02:48 +00:00
|
|
|
return schema_cache;
|
|
|
|
}
|
|
|
|
|
2022-06-27 12:43:24 +00:00
|
|
|
std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
|
|
|
|
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context)
|
2022-06-21 13:02:48 +00:00
|
|
|
{
|
|
|
|
/// Check if the cache contains one of the paths.
|
2022-08-05 16:20:15 +00:00
|
|
|
auto & schema_cache = getSchemaCache(context);
|
2022-06-21 13:02:48 +00:00
|
|
|
struct stat file_stat{};
|
|
|
|
for (const auto & path : paths)
|
|
|
|
{
|
2022-06-21 17:18:14 +00:00
|
|
|
auto get_last_mod_time = [&]() -> std::optional<time_t>
|
2022-06-21 13:02:48 +00:00
|
|
|
{
|
|
|
|
if (0 != stat(path.c_str(), &file_stat))
|
2022-06-21 17:18:14 +00:00
|
|
|
return std::nullopt;
|
2022-06-21 13:02:48 +00:00
|
|
|
|
2022-06-28 16:13:42 +00:00
|
|
|
return file_stat.st_mtime;
|
2022-06-21 13:02:48 +00:00
|
|
|
};
|
|
|
|
|
2022-08-19 16:42:23 +00:00
|
|
|
auto cache_key = getKeyForSchemaCache(path, format_name, format_settings, context);
|
2022-06-27 12:43:24 +00:00
|
|
|
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
|
2022-06-21 13:02:48 +00:00
|
|
|
if (columns)
|
|
|
|
return columns;
|
|
|
|
}
|
|
|
|
|
|
|
|
return std::nullopt;
|
|
|
|
}
|
|
|
|
|
2022-06-27 12:43:24 +00:00
|
|
|
void StorageFile::addColumnsToCache(
|
|
|
|
const Strings & paths,
|
|
|
|
const ColumnsDescription & columns,
|
|
|
|
const String & format_name,
|
|
|
|
const std::optional<FormatSettings> & format_settings,
|
|
|
|
const ContextPtr & context)
|
2022-06-21 13:02:48 +00:00
|
|
|
{
|
2022-08-05 16:20:15 +00:00
|
|
|
auto & schema_cache = getSchemaCache(context);
|
2022-08-19 16:42:23 +00:00
|
|
|
auto cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
|
2022-08-05 16:20:15 +00:00
|
|
|
schema_cache.addMany(cache_keys, columns);
|
2022-06-21 13:02:48 +00:00
|
|
|
}
|
|
|
|
|
2023-07-31 12:04:27 +00:00
|
|
|
void StorageFile::parseFileSource(String source, String & filename, String & path_to_archive)
|
|
|
|
{
|
|
|
|
size_t pos = source.find("::");
|
|
|
|
if (pos == String::npos)
|
|
|
|
{
|
|
|
|
filename = std::move(source);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string_view path_to_archive_view = std::string_view{source}.substr(0, pos);
|
|
|
|
while (path_to_archive_view.back() == ' ')
|
|
|
|
path_to_archive_view.remove_suffix(1);
|
|
|
|
|
|
|
|
if (path_to_archive_view.empty())
|
|
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
|
|
|
|
|
|
|
|
path_to_archive = path_to_archive_view;
|
|
|
|
|
|
|
|
std::string_view filename_view = std::string_view{source}.substr(pos + 2);
|
|
|
|
while (filename_view.front() == ' ')
|
|
|
|
filename_view.remove_prefix(1);
|
|
|
|
|
|
|
|
if (filename_view.empty())
|
|
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
|
|
|
|
|
|
|
|
filename = filename_view;
|
|
|
|
}
|
|
|
|
|
2021-03-30 21:25:37 +00:00
|
|
|
}
|