ClickHouse/src/Storages/StorageFile.cpp

1252 lines
45 KiB
C++
Raw Normal View History

#include <Storages/StorageFile.h>
#include <Storages/StorageFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTIdentifier_fwd.h>
2021-10-25 16:23:44 +00:00
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromFile.h>
2021-07-18 12:55:24 +00:00
#include <IO/ReadBufferFromFileDescriptor.h>
2019-11-20 14:48:01 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
2021-07-23 14:25:35 +00:00
#include <Processors/Sinks/SinkToStorage.h>
2021-07-21 16:13:17 +00:00
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
2019-07-21 13:15:04 +00:00
#include <Common/parseGlobs.h>
2021-10-17 08:42:36 +00:00
#include <Common/filesystemHelpers.h>
2021-03-30 21:28:23 +00:00
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
2021-10-25 16:23:44 +00:00
#include <Storages/PartitionedSink.h>
2021-07-06 10:23:39 +00:00
#include <sys/stat.h>
#include <fcntl.h>
2020-01-05 02:57:09 +00:00
#include <unistd.h>
2019-07-21 13:15:04 +00:00
#include <re2/re2.h>
2019-08-27 15:20:31 +00:00
#include <filesystem>
2020-01-04 14:45:11 +00:00
#include <Storages/Distributed/DirectoryMonitor.h>
2022-05-20 19:49:31 +00:00
#include <Processors/ISource.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Formats/IOutputFormat.h>
2021-10-13 18:22:02 +00:00
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Processors/Sources/NullSource.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/Pipe.h>
2022-05-20 19:49:31 +00:00
#include <QueryPipeline/QueryPipelineBuilder.h>
2021-07-20 18:18:43 +00:00
#include <Processors/Executors/PullingPipelineExecutor.h>
2019-07-21 13:15:04 +00:00
2021-04-27 00:05:43 +00:00
2019-08-27 15:20:31 +00:00
namespace fs = std::filesystem;
2019-07-21 13:15:04 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
2021-08-22 17:21:49 +00:00
extern const int CANNOT_FSTAT;
2020-01-05 02:57:09 +00:00
extern const int CANNOT_TRUNCATE_FILE;
extern const int DATABASE_ACCESS_DENIED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_IDENTIFIER;
extern const int INCORRECT_FILE_NAME;
extern const int FILE_DOESNT_EXIST;
extern const int TIMEOUT_EXCEEDED;
extern const int INCOMPATIBLE_COLUMNS;
2021-07-09 09:20:11 +00:00
extern const int CANNOT_STAT;
2021-10-28 13:56:45 +00:00
extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
2019-08-02 15:00:12 +00:00
namespace
{
2019-09-22 22:13:42 +00:00
2019-08-10 16:00:01 +00:00
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
void listFilesWithRegexpMatchingImpl(
const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result)
2019-08-02 15:00:12 +00:00
{
2019-08-30 15:19:05 +00:00
const size_t first_glob = for_match.find_first_of("*?{");
2019-08-05 23:10:19 +00:00
2019-08-30 15:19:05 +00:00
const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
2019-08-08 14:26:02 +00:00
2019-08-30 15:19:05 +00:00
const size_t next_slash = suffix_with_globs.find('/', 1);
2020-01-05 20:11:26 +00:00
auto regexp = makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash));
re2::RE2 matcher(regexp);
2019-08-05 23:10:19 +00:00
2019-08-30 15:19:05 +00:00
const std::string prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs);
2021-04-27 00:05:43 +00:00
if (!fs::exists(prefix_without_globs))
return;
2021-04-27 00:05:43 +00:00
2019-08-30 15:19:05 +00:00
const fs::directory_iterator end;
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
2019-08-02 15:00:12 +00:00
{
2019-08-30 15:19:05 +00:00
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
2019-08-10 16:00:01 +00:00
/// Condition is_directory means what kind of path is it in current iteration of ls
if (!it->is_directory() && !looking_for_directory)
2019-08-02 15:00:12 +00:00
{
2019-08-08 14:26:02 +00:00
if (re2::RE2::FullMatch(file_name, matcher))
2019-08-02 15:00:12 +00:00
{
total_bytes_to_read += it->file_size();
2019-08-02 15:00:12 +00:00
result.push_back(it->path().string());
}
}
else if (it->is_directory() && looking_for_directory)
2019-08-02 15:00:12 +00:00
{
2019-08-08 14:26:02 +00:00
if (re2::RE2::FullMatch(file_name, matcher))
2019-08-02 15:00:12 +00:00
{
2019-09-23 14:50:33 +00:00
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash), total_bytes_to_read, result);
2019-08-02 15:00:12 +00:00
}
}
}
}
std::vector<std::string> listFilesWithRegexpMatching(
const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read)
{
std::vector<std::string> result;
listFilesWithRegexpMatchingImpl(path_for_ls, for_match, total_bytes_to_read, result);
2019-08-02 15:00:12 +00:00
return result;
}
2020-03-09 01:03:43 +00:00
std::string getTablePath(const std::string & table_dir_path, const std::string & format_name)
{
2019-10-25 19:07:47 +00:00
return table_dir_path + "/data." + escapeForFileName(format_name);
}
2018-04-19 04:39:16 +00:00
/// Both db_dir_path and table_path must be converted to absolute paths (in particular, path cannot contain '..').
void checkCreationIsAllowed(
ContextPtr context_global,
const std::string & db_dir_path,
const std::string & table_path,
bool can_be_directory)
{
if (context_global->getApplicationType() != Context::ApplicationType::SERVER)
return;
2019-12-29 07:03:39 +00:00
/// "/dev/null" is allowed for perf testing
2021-10-17 08:42:36 +00:00
if (!fileOrSymlinkPathStartsWith(table_path, db_dir_path) && table_path != "/dev/null")
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "File `{}` is not inside `{}`", table_path, db_dir_path);
if (can_be_directory)
{
auto table_path_stat = fs::status(table_path);
if (fs::exists(table_path_stat) && fs::is_directory(table_path_stat))
throw Exception("File must not be a directory", ErrorCodes::INCORRECT_FILE_NAME);
}
}
2021-10-28 13:56:45 +00:00
std::unique_ptr<ReadBuffer> createReadBuffer(
const String & current_path,
bool use_table_fd,
const String & storage_name,
int table_fd,
const String & compression_method,
ContextPtr context)
{
std::unique_ptr<ReadBuffer> nested_buffer;
CompressionMethod method;
struct stat file_stat{};
if (use_table_fd)
{
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != fstat(table_fd, &file_stat))
throwFromErrno("Cannot stat table file descriptor, inside " + storage_name, ErrorCodes::CANNOT_STAT);
if (S_ISREG(file_stat.st_mode))
nested_buffer = std::make_unique<ReadBufferFromFileDescriptorPRead>(table_fd);
else
nested_buffer = std::make_unique<ReadBufferFromFileDescriptor>(table_fd);
method = chooseCompressionMethod("", compression_method);
}
else
{
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != stat(current_path.c_str(), &file_stat))
throwFromErrno("Cannot stat file " + current_path, ErrorCodes::CANNOT_STAT);
if (S_ISREG(file_stat.st_mode))
nested_buffer = std::make_unique<ReadBufferFromFilePRead>(current_path, context->getSettingsRef().max_read_buffer_size);
else
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path, context->getSettingsRef().max_read_buffer_size);
method = chooseCompressionMethod(current_path, compression_method);
}
/// For clickhouse-local and clickhouse-client add progress callback to display progress bar.
if (context->getApplicationType() == Context::ApplicationType::LOCAL
|| context->getApplicationType() == Context::ApplicationType::CLIENT)
{
auto & in = static_cast<ReadBufferFromFileDescriptor &>(*nested_buffer);
in.setProgressCallback(context);
}
auto zstd_window_log_max = context->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method, zstd_window_log_max);
}
2019-09-06 18:29:41 +00:00
}
2021-04-26 13:34:44 +00:00
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read)
{
2021-05-24 16:03:09 +00:00
fs::path user_files_absolute_path = fs::weakly_canonical(user_files_path);
2021-04-27 00:05:43 +00:00
fs::path fs_table_path(table_path);
if (fs_table_path.is_relative())
fs_table_path = user_files_absolute_path / fs_table_path;
Strings paths;
2022-02-12 16:05:35 +00:00
2021-10-17 08:42:36 +00:00
/// Do not use fs::canonical or fs::weakly_canonical.
/// Otherwise it will not allow to work with symlinks in `user_files_path` directory.
String path = fs::absolute(fs_table_path).lexically_normal(); /// Normalize path.
bool can_be_directory = true;
2022-02-12 16:05:35 +00:00
if (path.find(PartitionedSink::PARTITION_ID_WILDCARD) != std::string::npos)
{
paths.push_back(path);
}
else if (path.find_first_of("*?{") == std::string::npos)
2021-04-26 13:34:44 +00:00
{
2021-04-27 09:54:12 +00:00
std::error_code error;
size_t size = fs::file_size(path, error);
if (!error)
total_bytes_to_read += size;
2022-02-12 16:05:35 +00:00
paths.push_back(path);
2021-04-26 13:34:44 +00:00
}
else
2022-02-12 16:05:35 +00:00
{
/// We list only non-directory files.
2021-04-26 13:34:44 +00:00
paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read);
can_be_directory = false;
2022-02-12 16:05:35 +00:00
}
for (const auto & cur_path : paths)
checkCreationIsAllowed(context, user_files_absolute_path, cur_path, can_be_directory);
return paths;
}
ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr context)
{
/// If we want to read schema from file descriptor we should create
/// a read buffer from fd, create a checkpoint, read some data required
/// for schema inference, rollback to checkpoint and then use the created
/// peekable read buffer on the first read from storage. It's needed because
/// in case of file descriptor we have a stream of data and we cannot
/// start reading data from the beginning after reading some data for
/// schema inference.
ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
{
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place
/// where we can store the original read buffer.
read_buffer_from_fd = createReadBuffer("", true, getName(), table_fd, compression_method, context);
auto read_buf = std::make_unique<PeekableReadBuffer>(*read_buffer_from_fd);
read_buf->setCheckpoint();
return read_buf;
};
2022-04-19 19:16:47 +00:00
auto columns = readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, false, context, peekable_read_buffer_from_fd);
if (peekable_read_buffer_from_fd)
{
/// If we have created read buffer in readSchemaFromFormat we should rollback to checkpoint.
assert_cast<PeekableReadBuffer *>(peekable_read_buffer_from_fd.get())->rollbackToCheckpoint();
has_peekable_read_buffer_from_fd = true;
}
return columns;
}
ColumnsDescription StorageFile::getTableStructureFromFile(
const String & format,
const std::vector<String> & paths,
const String & compression_method,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
if (format == "Distributed")
{
if (paths.empty())
throw Exception(
"Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME);
auto source = StorageDistributedDirectoryMonitor::createSourceFromFile(paths[0]);
return ColumnsDescription(source->getOutputs().front().getHeader().getNamesAndTypesList());
}
if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format))
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path. You must specify "
"table structure manually",
format);
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().use_cache_for_file_schema_inference)
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
2022-04-19 19:16:47 +00:00
if (it == paths.end())
return nullptr;
2022-02-09 16:14:14 +00:00
2022-04-19 19:16:47 +00:00
return createReadBuffer(*it++, false, "File", -1, compression_method, context);
};
ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
if (context->getSettingsRef().use_cache_for_file_schema_inference)
addColumnsToCache(paths, columns, format, format_settings, context);
return columns;
}
2022-05-13 18:39:19 +00:00
bool StorageFile::supportsSubsetOfColumns() const
2021-03-31 14:21:19 +00:00
{
2022-05-13 18:39:19 +00:00
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
2021-03-31 14:21:19 +00:00
}
2019-10-30 14:17:55 +00:00
StorageFile::StorageFile(int table_fd_, CommonArguments args)
: StorageFile(args)
{
2021-08-22 17:21:49 +00:00
struct stat buf;
int res = fstat(table_fd_, &buf);
if (-1 == res)
throwFromErrno("Cannot execute fstat", res, ErrorCodes::CANNOT_FSTAT);
total_bytes_to_read = buf.st_size;
if (args.getContext()->getApplicationType() == Context::ApplicationType::SERVER)
2019-10-30 14:17:55 +00:00
throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
if (args.format_name == "Distributed")
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
2019-08-24 21:20:20 +00:00
2019-10-30 14:17:55 +00:00
is_db_table = false;
use_table_fd = true;
table_fd = table_fd_;
setStorageMetadata(args);
2019-10-30 14:17:55 +00:00
}
StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args)
2019-10-30 14:17:55 +00:00
: StorageFile(args)
{
is_db_table = false;
2021-04-26 13:34:44 +00:00
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
is_path_with_globs = paths.size() > 1;
2022-02-12 16:05:35 +00:00
if (!paths.empty())
path_for_partitioned_write = paths.front();
else
path_for_partitioned_write = table_path_;
setStorageMetadata(args);
2019-10-30 14:17:55 +00:00
}
2019-10-30 14:17:55 +00:00
StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArguments args)
: StorageFile(args)
{
if (relative_table_dir_path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
if (args.format_name == "Distributed")
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
2021-05-08 10:59:55 +00:00
String table_dir_path = fs::path(base_path) / relative_table_dir_path / "";
2021-04-27 00:05:43 +00:00
fs::create_directories(table_dir_path);
2019-10-30 14:17:55 +00:00
paths = {getTablePath(table_dir_path, format_name)};
std::error_code error;
size_t size = fs::file_size(paths[0], error);
if (!error)
total_bytes_to_read = size;
setStorageMetadata(args);
}
2019-10-30 14:17:55 +00:00
StorageFile::StorageFile(CommonArguments args)
2020-04-27 13:55:30 +00:00
: IStorage(args.table_id)
2019-12-04 16:06:55 +00:00
, format_name(args.format_name)
, format_settings(args.format_settings)
2019-12-04 16:06:55 +00:00
, compression_method(args.compression_method)
, base_path(args.getContext()->getPath())
{
2022-05-23 12:48:48 +00:00
if (format_name != "Distributed")
FormatFactory::instance().checkFormatName(format_name);
}
void StorageFile::setStorageMetadata(CommonArguments args)
2019-10-30 14:17:55 +00:00
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
if (args.format_name == "Distributed" || args.columns.empty())
{
ColumnsDescription columns;
if (use_table_fd)
columns = getTableStructureFromFileDescriptor(args.getContext());
else
{
columns = getTableStructureFromFile(format_name, paths, compression_method, format_settings, args.getContext());
if (!args.columns.empty() && args.columns != columns)
throw Exception("Table structure and file structure are different", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
storage_metadata.setColumns(columns);
}
else
2020-06-19 15:39:41 +00:00
storage_metadata.setColumns(args.columns);
2020-01-04 18:37:31 +00:00
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(args.constraints);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(args.comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2019-10-30 14:17:55 +00:00
}
static std::chrono::seconds getLockTimeout(ContextPtr context)
{
const Settings & settings = context->getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
2021-03-30 17:57:21 +00:00
using StorageFilePtr = std::shared_ptr<StorageFile>;
2022-05-20 19:49:31 +00:00
class StorageFileSource : public ISource
{
public:
struct FilesInfo
{
std::vector<std::string> files;
std::atomic<size_t> next_file_to_read = 0;
bool need_path_column = false;
bool need_file_column = false;
2022-05-06 15:04:03 +00:00
size_t total_bytes_to_read = 0;
};
using FilesInfoPtr = std::shared_ptr<FilesInfo>;
static Block getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
{
auto header = metadata_snapshot->getSampleBlock();
2022-05-09 19:13:02 +00:00
/// Note: AddingDefaultsTransform doesn't change header.
if (need_path_column)
header.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_path"});
if (need_file_column)
header.insert(
{DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumn(),
std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()),
"_file"});
return header;
}
2021-03-30 17:57:21 +00:00
static Block getBlockForSource(
const StorageFilePtr & storage,
const StorageSnapshotPtr & storage_snapshot,
2021-03-30 17:57:21 +00:00
const ColumnsDescription & columns_description,
const FilesInfoPtr & files_info)
{
2022-05-13 18:39:19 +00:00
if (storage->supportsSubsetOfColumns())
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2021-03-30 21:25:37 +00:00
else
return getHeader(storage_snapshot->metadata, files_info->need_path_column, files_info->need_file_column);
2021-03-30 17:57:21 +00:00
}
StorageFileSource(
std::shared_ptr<StorageFile> storage_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnsDescription columns_description_,
std::unique_ptr<ReadBuffer> read_buf_)
2022-05-20 19:49:31 +00:00
: ISource(getBlockForSource(storage_, storage_snapshot_, columns_description_, files_info_))
, storage(std::move(storage_))
, storage_snapshot(storage_snapshot_)
, files_info(std::move(files_info_))
, read_buf(std::move(read_buf_))
2020-10-02 12:38:50 +00:00
, columns_description(std::move(columns_description_))
, context(context_)
, max_block_size(max_block_size_)
{
if (!storage->use_table_fd)
{
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context));
if (!shared_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
String getName() const override
{
return storage->getName();
}
Chunk generate() override
{
while (!finished_generate)
{
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.
if (!reader)
{
if (!storage->use_table_fd)
{
auto current_file = files_info->next_file_to_read.fetch_add(1);
if (current_file >= files_info->files.size())
return {};
current_path = files_info->files[current_file];
/// Special case for distributed format. Defaults are not needed here.
if (storage->format_name == "Distributed")
{
2021-09-16 17:40:42 +00:00
pipeline = std::make_unique<QueryPipeline>(StorageDistributedDirectoryMonitor::createSourceFromFile(current_path));
2021-07-20 18:18:43 +00:00
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
continue;
}
}
if (!read_buf)
read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->getName(), storage->table_fd, storage->compression_method, context);
2021-03-30 17:57:21 +00:00
auto get_block_for_format = [&]() -> Block
{
2022-05-13 18:39:19 +00:00
if (storage->supportsSubsetOfColumns())
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
return storage_snapshot->metadata->getSampleBlock();
2021-03-30 17:57:21 +00:00
};
2021-10-11 16:11:50 +00:00
auto format = context->getInputFormat(
storage->format_name, *read_buf, get_block_for_format(), max_block_size, storage->format_settings);
2020-05-18 10:00:22 +00:00
2021-09-16 17:40:42 +00:00
QueryPipelineBuilder builder;
builder.init(Pipe(format));
2020-10-02 12:38:50 +00:00
if (columns_description.hasDefaults())
2021-07-20 18:18:43 +00:00
{
2021-09-16 17:40:42 +00:00
builder.addSimpleTransform([&](const Block & header)
2021-07-20 18:18:43 +00:00
{
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *format, context);
});
}
2022-05-24 20:06:08 +00:00
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
2021-09-16 17:40:42 +00:00
2021-07-20 18:18:43 +00:00
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
}
2021-07-20 18:18:43 +00:00
Chunk chunk;
if (reader->pull(chunk))
{
2021-07-20 18:18:43 +00:00
UInt64 num_rows = chunk.getNumRows();
/// Enrich with virtual columns.
if (files_info->need_path_column)
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
2021-07-20 18:18:43 +00:00
chunk.addColumn(column->convertToFullColumnIfConst());
}
if (files_info->need_file_column)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
2021-07-20 18:18:43 +00:00
chunk.addColumn(column->convertToFullColumnIfConst());
}
2022-05-06 15:04:03 +00:00
if (num_rows)
{
auto bytes_per_row = std::ceil(static_cast<double>(chunk.bytes()) / num_rows);
size_t total_rows_approx = std::ceil(static_cast<double>(files_info->total_bytes_to_read) / bytes_per_row);
total_rows_approx_accumulated += total_rows_approx;
++total_rows_count_times;
total_rows_approx = total_rows_approx_accumulated / total_rows_count_times;
2022-05-11 12:21:44 +00:00
/// We need to add diff, because total_rows_approx is incremental value.
/// It would be more correct to send total_rows_approx as is (not a diff),
/// but incrementation of total_rows_to_read does not allow that.
/// A new field can be introduces for that to be sent to client, but it does not worth it.
if (total_rows_approx > total_rows_approx_prev)
{
size_t diff = total_rows_approx - total_rows_approx_prev;
addTotalRowsApprox(diff);
total_rows_approx_prev = total_rows_approx;
}
2022-05-06 15:04:03 +00:00
}
2021-07-20 18:18:43 +00:00
return chunk;
}
/// Read only once for file descriptor.
if (storage->use_table_fd)
finished_generate = true;
/// Close file prematurely if stream was ended.
reader.reset();
2021-07-20 18:18:43 +00:00
pipeline.reset();
read_buf.reset();
}
return {};
}
2021-04-26 13:34:44 +00:00
private:
std::shared_ptr<StorageFile> storage;
StorageSnapshotPtr storage_snapshot;
FilesInfoPtr files_info;
String current_path;
Block sample_block;
std::unique_ptr<ReadBuffer> read_buf;
2021-07-20 18:18:43 +00:00
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
2020-10-02 12:38:50 +00:00
ColumnsDescription columns_description;
ContextPtr context; /// TODO Untangle potential issues with context lifetime.
UInt64 max_block_size;
bool finished_generate = false;
std::shared_lock<std::shared_timed_mutex> shared_lock;
2022-05-06 15:04:03 +00:00
UInt64 total_rows_approx_accumulated = 0;
size_t total_rows_count_times = 0;
2022-05-11 12:21:44 +00:00
UInt64 total_rows_approx_prev = 0;
};
2020-08-03 13:54:14 +00:00
Pipe StorageFile::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
2022-05-09 19:13:02 +00:00
if (use_table_fd)
{
2019-09-06 18:29:41 +00:00
paths = {""}; /// when use fd, paths are empty
2022-05-09 19:13:02 +00:00
}
else
{
2021-04-27 00:05:43 +00:00
if (paths.size() == 1 && !fs::exists(paths[0]))
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
return Pipe(std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
else
throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
}
}
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
files_info->files = paths;
2022-05-06 15:04:03 +00:00
files_info->total_bytes_to_read = total_bytes_to_read;
for (const auto & column : column_names)
{
if (column == "_path")
files_info->need_path_column = true;
if (column == "_file")
files_info->need_file_column = true;
}
2020-01-04 14:45:11 +00:00
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
2020-01-04 14:45:11 +00:00
2020-02-14 10:22:05 +00:00
if (num_streams > paths.size())
num_streams = paths.size();
Pipes pipes;
pipes.reserve(num_streams);
/// Set total number of bytes to process. For progress bar.
auto progress_callback = context->getFileProgressCallback();
2022-05-06 15:04:03 +00:00
if (progress_callback)
progress_callback(FileProgress(0, total_bytes_to_read));
2021-04-26 13:34:44 +00:00
for (size_t i = 0; i < num_streams; ++i)
{
2021-03-30 17:57:21 +00:00
const auto get_columns_for_format = [&]() -> ColumnsDescription
{
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
return storage_snapshot->getDescriptionForColumns(column_names);
2021-03-30 17:57:21 +00:00
else
return storage_snapshot->metadata->getColumns();
2021-03-30 17:57:21 +00:00
};
/// In case of reading from fd we have to check whether we have already created
/// the read buffer from it in Storage constructor (for schema inference) or not.
/// If yes, then we should use it in StorageFileSource. Atomic bool flag is needed
/// to prevent data race in case of parallel reads.
std::unique_ptr<ReadBuffer> read_buffer;
if (has_peekable_read_buffer_from_fd.exchange(false))
read_buffer = std::move(peekable_read_buffer_from_fd);
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, storage_snapshot, context, max_block_size, files_info, get_columns_for_format(), std::move(read_buffer)));
}
2020-08-06 12:24:05 +00:00
return Pipe::unitePipes(std::move(pipes));
}
2021-07-26 14:47:29 +00:00
class StorageFileSink final : public SinkToStorage
{
public:
2021-10-25 16:23:44 +00:00
StorageFileSink(
const StorageMetadataPtr & metadata_snapshot_,
2021-10-28 12:44:12 +00:00
const String & table_name_for_log_,
int table_fd_,
bool use_table_fd_,
std::string base_path_,
2022-01-14 18:18:16 +00:00
std::string path_,
2021-10-25 16:23:44 +00:00
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
2021-10-28 12:44:12 +00:00
const String format_name_,
ContextPtr context_,
2021-10-25 16:23:44 +00:00
int flags_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, metadata_snapshot(metadata_snapshot_)
2021-10-28 12:44:12 +00:00
, table_name_for_log(table_name_for_log_)
, table_fd(table_fd_)
, use_table_fd(use_table_fd_)
, base_path(base_path_)
2022-01-14 18:18:16 +00:00
, path(path_)
2021-10-25 16:23:44 +00:00
, compression_method(compression_method_)
2021-10-28 12:44:12 +00:00
, format_name(format_name_)
2021-10-25 16:23:44 +00:00
, format_settings(format_settings_)
2021-10-28 12:44:12 +00:00
, context(context_)
2021-10-25 16:23:44 +00:00
, flags(flags_)
{
initialize();
}
StorageFileSink(
const StorageMetadataPtr & metadata_snapshot_,
2021-10-28 12:44:12 +00:00
const String & table_name_for_log_,
std::unique_lock<std::shared_timed_mutex> && lock_,
2021-10-28 12:44:12 +00:00
int table_fd_,
bool use_table_fd_,
std::string base_path_,
2022-01-14 18:18:16 +00:00
const std::string & path_,
2021-10-25 16:23:44 +00:00
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
2021-10-28 12:44:12 +00:00
const String format_name_,
ContextPtr context_,
2021-10-25 16:23:44 +00:00
int flags_)
2021-07-26 10:08:40 +00:00
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, metadata_snapshot(metadata_snapshot_)
2021-10-28 12:44:12 +00:00
, table_name_for_log(table_name_for_log_)
, table_fd(table_fd_)
, use_table_fd(use_table_fd_)
, base_path(base_path_)
2022-01-14 18:18:16 +00:00
, path(path_)
2021-10-25 16:23:44 +00:00
, compression_method(compression_method_)
2021-10-28 12:44:12 +00:00
, format_name(format_name_)
2021-10-25 16:23:44 +00:00
, format_settings(format_settings_)
2021-10-28 12:44:12 +00:00
, context(context_)
2021-10-25 16:23:44 +00:00
, flags(flags_)
, lock(std::move(lock_))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
2021-10-25 16:23:44 +00:00
initialize();
}
2021-10-25 16:23:44 +00:00
void initialize()
{
2020-07-07 11:45:20 +00:00
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
2021-10-28 12:44:12 +00:00
if (use_table_fd)
{
2021-10-28 12:44:12 +00:00
naked_buffer = std::make_unique<WriteBufferFromFileDescriptor>(table_fd, DBMS_DEFAULT_BUFFER_SIZE);
}
else
{
flags |= O_WRONLY | O_APPEND | O_CREAT;
2022-01-14 18:18:16 +00:00
naked_buffer = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, flags);
}
2021-11-02 13:40:41 +00:00
/// In case of formats with prefixes if file is not empty we have already written prefix.
bool do_not_write_prefix = naked_buffer->size();
2020-07-07 11:45:20 +00:00
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
2021-10-28 12:44:12 +00:00
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context,
{}, format_settings);
2021-11-02 13:40:41 +00:00
if (do_not_write_prefix)
writer->doNotWritePrefix();
}
2021-07-23 19:33:59 +00:00
String getName() const override { return "StorageFileSink"; }
2021-07-26 14:47:29 +00:00
void consume(Chunk chunk) override
{
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onException() override
{
if (!writer)
return;
Fix possible "Cannot write to finalized buffer" It is still possible to get this error since onException does not finalize format correctly. Here is an example of such error, that was found by CI [1]: <details> [ 2686 ] {fa01bf02-73f6-4f7f-b14f-e725de6d7f9b} <Fatal> : Logical error: 'Cannot write to finalized buffer'. [ 34577 ] {} <Fatal> BaseDaemon: ######################################## [ 34577 ] {} <Fatal> BaseDaemon: (version 22.6.1.1, build id: AB8040A6769E01A0) (from thread 2686) (query_id: fa01bf02-73f6-4f7f-b14f-e725de6d7f9b) (query: insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;) Received signal Aborted (6) [ 34577 ] {} <Fatal> BaseDaemon: [ 34577 ] {} <Fatal> BaseDaemon: Stack trace: 0x7fcbaa5a703b 0x7fcbaa586859 0xfad9bab 0xfad9e05 0xfaf6a3b 0x24a48c7f 0x258fb9b9 0x258f2004 0x258b88f4 0x258b863b 0x2581773d 0x258177ce 0x24bb5e98 0xfad01d6 0xfad0105 0x2419b11d 0xfad01d6 0xfad0105 0x2215afbb 0x2215aa48 0xfad01d6 0xfad0105 0xfcc265d 0x225cc546 0x249a1c40 0x249bc1b6 0x2685902c 0x26859505 0x269d7767 0x269d504c 0x7fcbaa75e609 0x7fcbaa683163 [ 34577 ] {} <Fatal> BaseDaemon: 3. raise @ 0x7fcbaa5a703b in ? [ 34577 ] {} <Fatal> BaseDaemon: 4. abort @ 0x7fcbaa586859 in ? [ 34577 ] {} <Fatal> BaseDaemon: 5. ./build_docker/../src/Common/Exception.cpp:47: DB::abortOnFailedAssertion(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfad9bab in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 6. ./build_docker/../src/Common/Exception.cpp:70: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xfad9e05 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 7. ./build_docker/../src/IO/WriteBuffer.h:0: DB::WriteBuffer::write(char const*, unsigned long) @ 0xfaf6a3b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 8. ./build_docker/../src/Processors/Formats/Impl/ArrowBufferedStreams.cpp:47: DB::ArrowBufferedOutputStream::Write(void const*, long) @ 0x24a48c7f in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 9. long parquet::ThriftSerializer::Serialize<parquet::format::FileMetaData>(parquet::format::FileMetaData const*, arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) @ 0x258fb9b9 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 10. parquet::FileMetaData::FileMetaDataImpl::WriteTo(arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) const @ 0x258f2004 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 11. parquet::WriteFileMetaData(parquet::FileMetaData const&, arrow::io::OutputStream*) @ 0x258b88f4 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 12. parquet::ParquetFileWriter::~ParquetFileWriter() @ 0x258b863b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 13. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x2581773d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 14. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x258177ce in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 15. ./build_docker/../src/Processors/Formats/Impl/ParquetBlockOutputFormat.h:27: DB::ParquetBlockOutputFormat::~ParquetBlockOutputFormat() @ 0x24bb5e98 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 16. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 17. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 18.1. inlined from ./build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:312: std::__1::unique_ptr<DB::WriteBuffer, std::__1::default_delete<DB::WriteBuffer> >::reset(DB::WriteBuffer*) [ 34577 ] {} <Fatal> BaseDaemon: 18.2. inlined from ../contrib/libcxx/include/__memory/unique_ptr.h:269: ~unique_ptr [ 34577 ] {} <Fatal> BaseDaemon: 18. ../src/Storages/StorageS3.cpp:566: DB::StorageS3Sink::~StorageS3Sink() @ 0x2419b11d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 19. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 20. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 21. ./build_docker/../contrib/abseil-cpp/absl/container/internal/raw_hash_set.h:1662: absl::lts_20211102::container_internal::raw_hash_set<absl::lts_20211102::container_internal::FlatHashMapPolicy<StringRef, std::__1::shared_ptr<DB::SinkToStorage> >, absl::lts_20211102::hash_internal::Hash<StringRef>, std::__1::equal_to<StringRef>, std::__1::allocator<std::__1::pair<StringRef const, std::__1::shared_ptr<DB::SinkToStorage> > > >::destroy_slots() @ 0x2215afbb in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 22.1. inlined from ./build_docker/../contrib/libcxx/include/string:1445: std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const [ 34577 ] {} <Fatal> BaseDaemon: 22.2. inlined from ../contrib/libcxx/include/string:2231: ~basic_string [ 34577 ] {} <Fatal> BaseDaemon: 22. ../src/Storages/PartitionedSink.h:14: DB::PartitionedSink::~PartitionedSink() @ 0x2215aa48 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 23. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 24. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 25. ./build_docker/../contrib/libcxx/include/vector:802: std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > >::__base_destruct_at_end(std::__1::shared_ptr<DB::IProcessor>*) @ 0xfcc265d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 26.1. inlined from ./build_docker/../contrib/libcxx/include/vector:402: ~vector [ 34577 ] {} <Fatal> BaseDaemon: 26.2. inlined from ../src/QueryPipeline/QueryPipeline.cpp:29: ~QueryPipeline [ 34577 ] {} <Fatal> BaseDaemon: 26. ../src/QueryPipeline/QueryPipeline.cpp:535: DB::QueryPipeline::reset() @ 0x225cc546 in /usr/bin/clickhouse [ 614 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/37542/8a224239c1d922158b4dc9f5d6609dca836dfd06/stress_test__undefined__actions_.html Follow-up for: #36979 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-30 10:13:47 +00:00
onFinish();
}
void onFinish() override
{
try
{
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
writer.reset();
throw;
}
}
private:
StorageMetadataPtr metadata_snapshot;
2021-10-28 12:44:12 +00:00
String table_name_for_log;
2021-10-25 16:23:44 +00:00
std::unique_ptr<WriteBuffer> write_buf;
2021-10-11 16:11:50 +00:00
OutputFormatPtr writer;
2021-10-28 12:44:12 +00:00
int table_fd;
bool use_table_fd;
std::string base_path;
2022-01-14 18:18:16 +00:00
std::string path;
2021-10-28 12:44:12 +00:00
CompressionMethod compression_method;
std::string format_name;
std::optional<FormatSettings> format_settings;
ContextPtr context;
int flags;
std::unique_lock<std::shared_timed_mutex> lock;
};
2021-10-25 16:23:44 +00:00
class PartitionedStorageFileSink : public PartitionedSink
{
public:
PartitionedStorageFileSink(
const ASTPtr & partition_by,
const StorageMetadataPtr & metadata_snapshot_,
2021-10-28 12:44:12 +00:00
const String & table_name_for_log_,
2021-10-25 16:23:44 +00:00
std::unique_lock<std::shared_timed_mutex> && lock_,
2021-10-28 13:56:45 +00:00
String base_path_,
String path_,
2021-10-25 16:23:44 +00:00
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
2021-10-28 12:44:12 +00:00
const String format_name_,
ContextPtr context_,
int flags_)
2021-10-25 16:23:44 +00:00
: PartitionedSink(partition_by, context_, metadata_snapshot_->getSampleBlock())
2021-10-28 13:56:45 +00:00
, path(path_)
2021-10-25 16:23:44 +00:00
, metadata_snapshot(metadata_snapshot_)
2021-10-28 12:44:12 +00:00
, table_name_for_log(table_name_for_log_)
, base_path(base_path_)
2021-10-25 16:23:44 +00:00
, compression_method(compression_method_)
2021-10-28 12:44:12 +00:00
, format_name(format_name_)
2021-10-25 16:23:44 +00:00
, format_settings(format_settings_)
2021-10-28 12:44:12 +00:00
, context(context_)
2021-10-25 16:23:44 +00:00
, flags(flags_)
2021-10-28 12:44:12 +00:00
, lock(std::move(lock_))
2021-10-25 16:23:44 +00:00
{
}
SinkPtr createSinkForPartition(const String & partition_id) override
{
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
PartitionedSink::validatePartitionKey(partition_path, true);
checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path, /*can_be_directory=*/ true);
2021-10-25 16:23:44 +00:00
return std::make_shared<StorageFileSink>(
2021-10-28 12:44:12 +00:00
metadata_snapshot,
table_name_for_log,
2021-10-28 13:56:45 +00:00
-1,
/* use_table_fd */false,
2021-10-28 12:44:12 +00:00
base_path,
2022-01-14 18:18:16 +00:00
partition_path,
2021-10-28 12:44:12 +00:00
compression_method,
format_settings,
format_name,
context,
flags);
2021-10-25 16:23:44 +00:00
}
private:
const String path;
StorageMetadataPtr metadata_snapshot;
2021-10-28 12:44:12 +00:00
String table_name_for_log;
std::string base_path;
CompressionMethod compression_method;
std::string format_name;
std::optional<FormatSettings> format_settings;
2021-10-25 16:23:44 +00:00
ContextPtr context;
int flags;
2021-10-28 12:44:12 +00:00
std::unique_lock<std::shared_timed_mutex> lock;
};
2021-10-25 16:23:44 +00:00
2021-07-23 14:25:35 +00:00
SinkToStoragePtr StorageFile::write(
2021-10-25 16:23:44 +00:00
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{
2020-01-04 14:45:11 +00:00
if (format_name == "Distributed")
throw Exception("Method write is not implemented for Distributed format", ErrorCodes::NOT_IMPLEMENTED);
int flags = 0;
if (context->getSettingsRef().engine_file_truncate_on_insert)
flags |= O_TRUNC;
2021-10-28 13:56:45 +00:00
bool has_wildcards = path_for_partitioned_write.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
2021-10-25 16:23:44 +00:00
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
bool is_partitioned_implementation = insert_query && insert_query->partition_by && has_wildcards;
if (is_partitioned_implementation)
{
2021-10-28 13:56:45 +00:00
if (path_for_partitioned_write.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty path for partitioned write");
2022-02-12 16:05:35 +00:00
2021-10-28 13:56:45 +00:00
fs::create_directories(fs::path(path_for_partitioned_write).parent_path());
2021-10-25 16:23:44 +00:00
return std::make_shared<PartitionedStorageFileSink>(
insert_query->partition_by,
metadata_snapshot,
2021-10-28 12:44:12 +00:00
getStorageID().getNameForLogs(),
2021-10-25 16:23:44 +00:00
std::unique_lock{rwlock, getLockTimeout(context)},
2021-10-28 12:44:12 +00:00
base_path,
2021-10-28 13:56:45 +00:00
path_for_partitioned_write,
2022-01-14 18:18:16 +00:00
chooseCompressionMethod(path_for_partitioned_write, compression_method),
2021-10-25 16:23:44 +00:00
format_settings,
2021-10-28 12:44:12 +00:00
format_name,
context,
2021-10-25 16:23:44 +00:00
flags);
}
2021-10-25 16:23:44 +00:00
else
{
2022-01-14 18:18:16 +00:00
String path;
2021-10-28 13:56:45 +00:00
if (!paths.empty())
{
if (is_path_with_globs)
throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
2022-01-14 18:18:16 +00:00
path = paths.back();
2021-10-28 13:56:45 +00:00
fs::create_directories(fs::path(path).parent_path());
2022-01-14 18:18:16 +00:00
std::error_code error_code;
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings)
&& fs::file_size(paths.back(), error_code) != 0 && !error_code)
{
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
{
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
size_t index = paths.size();
String new_path;
do
{
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
++index;
}
while (fs::exists(new_path));
paths.push_back(new_path);
2022-01-14 18:18:16 +00:00
path = new_path;
}
else
throw Exception(
ErrorCodes::CANNOT_APPEND_TO_FILE,
2022-01-14 16:17:06 +00:00
"Cannot append data in format {} to file, because this format doesn't support appends."
" You can allow to create a new file "
"on each insert by enabling setting engine_file_allow_create_multiple_files",
format_name);
}
2021-10-28 13:56:45 +00:00
}
2021-10-25 16:23:44 +00:00
return std::make_shared<StorageFileSink>(
metadata_snapshot,
2021-10-28 12:44:12 +00:00
getStorageID().getNameForLogs(),
2021-10-25 16:23:44 +00:00
std::unique_lock{rwlock, getLockTimeout(context)},
2021-10-28 12:44:12 +00:00
table_fd,
use_table_fd,
base_path,
2022-01-14 18:18:16 +00:00
path,
2021-10-25 16:23:44 +00:00
chooseCompressionMethod(path, compression_method),
format_settings,
2021-10-28 12:44:12 +00:00
format_name,
context,
2021-10-25 16:23:44 +00:00
flags);
}
}
2020-11-01 17:38:43 +00:00
bool StorageFile::storesDataOnDisk() const
{
return is_db_table;
}
2019-09-06 08:53:32 +00:00
Strings StorageFile::getDataPaths() const
2019-09-04 19:55:56 +00:00
{
2019-09-05 18:09:19 +00:00
if (paths.empty())
2019-12-03 16:25:32 +00:00
throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
2019-09-06 08:53:32 +00:00
return paths;
2019-09-04 19:55:56 +00:00
}
2020-04-07 14:05:51 +00:00
void StorageFile::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
{
if (!is_db_table)
2021-06-28 17:02:22 +00:00
throw Exception("Can't rename table " + getStorageID().getNameForLogs() + " bounded to user-defined file (or FD)", ErrorCodes::DATABASE_ACCESS_DENIED);
2019-09-04 11:11:30 +00:00
if (paths.size() != 1)
2019-12-03 16:25:32 +00:00
throw Exception("Can't rename table " + getStorageID().getNameForLogs() + " in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
2019-09-04 11:11:30 +00:00
std::string path_new = getTablePath(base_path + new_path_to_table_data, format_name);
if (path_new == paths[0])
return;
2021-04-27 00:05:43 +00:00
fs::create_directories(fs::path(path_new).parent_path());
fs::rename(paths[0], path_new);
2019-09-04 11:11:30 +00:00
paths[0] = std::move(path_new);
2020-04-07 14:05:51 +00:00
renameInMemory(new_table_id);
}
2020-06-18 10:29:13 +00:00
void StorageFile::truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
2020-06-18 16:10:47 +00:00
TableExclusiveLockHolder &)
2020-01-05 02:57:09 +00:00
{
if (is_path_with_globs)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
2020-01-05 02:57:09 +00:00
if (use_table_fd)
{
if (0 != ::ftruncate(table_fd, 0))
throwFromErrno("Cannot truncate file at fd " + toString(table_fd), ErrorCodes::CANNOT_TRUNCATE_FILE);
}
else
{
for (const auto & path : paths)
{
if (!fs::exists(path))
continue;
2020-01-05 20:11:26 +00:00
if (0 != ::truncate(path.c_str(), 0))
throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
}
2020-01-05 02:57:09 +00:00
}
}
void registerStorageFile(StorageFactory & factory)
{
StorageFactory::StorageFeatures storage_features{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::FILE,
};
factory.registerStorage(
"File",
[](const StorageFactory::Arguments & factory_args)
{
StorageFile::CommonArguments storage_args
{
WithContext(factory_args.getContext()),
factory_args.table_id,
{},
{},
{},
factory_args.columns,
factory_args.constraints,
2021-04-23 12:18:23 +00:00
factory_args.comment,
};
ASTs & engine_args_ast = factory_args.engine_args;
if (!(engine_args_ast.size() >= 1 && engine_args_ast.size() <= 3)) // NOLINT
throw Exception(
"Storage File requires from 1 to 3 arguments: name of used format, source and compression_method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.getLocalContext());
storage_args.format_name = engine_args_ast[0]->as<ASTLiteral &>().value.safeGet<String>();
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
if (factory_args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
2019-10-30 14:17:55 +00:00
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = factory_args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(
factory_args.storage_def->settings->changes);
storage_args.format_settings = getFormatSettings(
factory_args.getContext(), user_format_settings);
}
else
{
storage_args.format_settings = getFormatSettings(
factory_args.getContext());
}
2019-10-30 14:17:55 +00:00
if (engine_args_ast.size() == 1) /// Table in database
return std::make_shared<StorageFile>(factory_args.relative_data_path, storage_args);
2019-10-30 14:17:55 +00:00
/// Will use FD if engine_args[1] is int literal or identifier with std* name
int source_fd = -1;
String source_path;
if (auto opt_name = tryGetIdentifierName(engine_args_ast[1]))
{
if (*opt_name == "stdin")
source_fd = STDIN_FILENO;
else if (*opt_name == "stdout")
source_fd = STDOUT_FILENO;
else if (*opt_name == "stderr")
source_fd = STDERR_FILENO;
else
throw Exception(
"Unknown identifier '" + *opt_name + "' in second arg of File storage constructor", ErrorCodes::UNKNOWN_IDENTIFIER);
}
else if (const auto * literal = engine_args_ast[1]->as<ASTLiteral>())
{
auto type = literal->value.getType();
if (type == Field::Types::Int64)
source_fd = static_cast<int>(literal->value.get<Int64>());
else if (type == Field::Types::UInt64)
source_fd = static_cast<int>(literal->value.get<UInt64>());
else if (type == Field::Types::String)
source_path = literal->value.get<String>();
else
throw Exception("Second argument must be path or file descriptor", ErrorCodes::BAD_ARGUMENTS);
}
if (engine_args_ast.size() == 3)
{
engine_args_ast[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[2], factory_args.getLocalContext());
storage_args.compression_method = engine_args_ast[2]->as<ASTLiteral &>().value.safeGet<String>();
}
2019-10-30 14:17:55 +00:00
else
storage_args.compression_method = "auto";
if (0 <= source_fd) /// File descriptor
return std::make_shared<StorageFile>(source_fd, storage_args);
else /// User's file
return std::make_shared<StorageFile>(source_path, factory_args.getContext()->getUserFilesPath(), storage_args);
},
storage_features);
}
NamesAndTypesList StorageFile::getVirtuals() const
2020-04-27 13:55:30 +00:00
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
2020-04-27 13:55:30 +00:00
}
SchemaCache & StorageFile::getSchemaCache()
{
static SchemaCache schema_cache;
return schema_cache;
}
std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context)
{
/// Check if the cache contains one of the paths.
auto & schema_cache = getSchemaCache();
struct stat file_stat{};
for (const auto & path : paths)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(path.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtim.tv_sec;
};
String cache_key = getKeyForSchemaCache(path, format_name, format_settings, context);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageFile::addColumnsToCache(
const Strings & paths,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache();
Strings cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns, context->getSettingsRef().cache_ttl_for_file_schema_inference.totalSeconds());
}
2021-03-30 21:25:37 +00:00
}