fix style

This commit is contained in:
Nikita Keba 2023-05-29 20:08:18 +00:00
parent 8cf79cdb6c
commit c18bff58b3
7 changed files with 445 additions and 396 deletions

2
contrib/libarchive vendored

@ -1 +1 @@
Subproject commit 1f3c62ebf4d492ac21d3099b3b064993100dd997 Subproject commit 30a8610f4d05141d85bb9b123cdec16906a02c59

View File

@ -3,31 +3,36 @@
#include <Common/quoteString.h> #include <Common/quoteString.h>
namespace DB{ namespace DB
{
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_UNPACK_ARCHIVE; extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int SEEK_POSITION_OUT_OF_BOUND;
} }
class SevenZipArchiveReader::Handle { class SevenZipArchiveReader::Handle
{
public: public:
Handle(const String & path_to_archive_) Handle(const String & path_to_archive_) : path_to_archive(path_to_archive_)
: path_to_archive(path_to_archive_) { {
archive = archive_read_new(); archive = archive_read_new();
archive_read_support_filter_all(archive); archive_read_support_filter_all(archive);
archive_read_support_format_all(archive); archive_read_support_format_all(archive);
if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK) { if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK)
{
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open 7z archive {}", quoteString(path_to_archive)); throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open 7z archive {}", quoteString(path_to_archive));
} }
entry = archive_entry_new(); entry = archive_entry_new();
} }
~Handle() { ~Handle()
{
archive_read_close(archive); archive_read_close(archive);
archive_read_free(archive); archive_read_free(archive);
} }
bool locateFile(const String &filename) { bool locateFile(const String & filename)
{
while (archive_read_next_header(archive, &entry) == ARCHIVE_OK) while (archive_read_next_header(archive, &entry) == ARCHIVE_OK)
{ {
if (archive_entry_pathname(entry) == filename) if (archive_entry_pathname(entry) == filename)
@ -47,7 +52,10 @@ class SevenZipArchiveReader::ReadBufferFromSevenZipArchive : public ReadBufferFr
{ {
public: public:
explicit ReadBufferFromSevenZipArchive(const String & path_to_archive_, const String & filename_) explicit ReadBufferFromSevenZipArchive(const String & path_to_archive_, const String & filename_)
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0), handle(path_to_archive_), path_to_archive(path_to_archive_), filename(filename_) : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(path_to_archive_)
, path_to_archive(path_to_archive_)
, filename(filename_)
{ {
handle.locateFile(filename_); handle.locateFile(filename_);
} }
@ -87,10 +95,7 @@ public:
return new_pos; return new_pos;
} }
off_t getPosition() override off_t getPosition() override { return archive_entry_size(handle.entry) - available(); }
{
return archive_entry_size(handle.entry) - available();
}
String getFileName() const override { return filename; } String getFileName() const override { return filename; }
@ -112,21 +117,27 @@ private:
const String filename; const String filename;
}; };
SevenZipArchiveReader::SevenZipArchiveReader(const String & path_to_archive_) SevenZipArchiveReader::SevenZipArchiveReader(const String & path_to_archive_) : path_to_archive(path_to_archive_)
: path_to_archive(path_to_archive_) { {
} }
SevenZipArchiveReader::SevenZipArchiveReader(const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_): path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), archive_size(archive_size_) {} SevenZipArchiveReader::SevenZipArchiveReader(
const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_)
: path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), archive_size(archive_size_)
{
}
SevenZipArchiveReader::~SevenZipArchiveReader() {} SevenZipArchiveReader::~SevenZipArchiveReader()
{
}
bool SevenZipArchiveReader::fileExists(const String & filename) bool SevenZipArchiveReader::fileExists(const String & filename)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive);
return handle.locateFile(filename); return handle.locateFile(filename);
} }
SevenZipArchiveReader::FileInfo SevenZipArchiveReader::getFileInfo(const String & filename) { SevenZipArchiveReader::FileInfo SevenZipArchiveReader::getFileInfo(const String & filename)
{
Handle handle(path_to_archive); Handle handle(path_to_archive);
handle.locateFile(filename); handle.locateFile(filename);
@ -138,28 +149,33 @@ private:
return info; return info;
} }
std::unique_ptr<SevenZipArchiveReader::FileEnumerator> SevenZipArchiveReader::firstFile() { std::unique_ptr<SevenZipArchiveReader::FileEnumerator> SevenZipArchiveReader::firstFile()
{
return nullptr; return nullptr;
} }
std::unique_ptr<ReadBufferFromFileBase> SevenZipArchiveReader::readFile(const String & filename) { std::unique_ptr<ReadBufferFromFileBase> SevenZipArchiveReader::readFile(const String & filename)
{
Handle handle(path_to_archive); Handle handle(path_to_archive);
handle.locateFile(filename); handle.locateFile(filename);
return std::make_unique<ReadBufferFromSevenZipArchive>(path_to_archive, filename); return std::make_unique<ReadBufferFromSevenZipArchive>(path_to_archive, filename);
} }
std::unique_ptr<ReadBufferFromFileBase> SevenZipArchiveReader::readFile([[maybe_unused]] std::unique_ptr<FileEnumerator> enumerator) { std::unique_ptr<ReadBufferFromFileBase> SevenZipArchiveReader::readFile([[maybe_unused]] std::unique_ptr<FileEnumerator> enumerator)
{
return nullptr; return nullptr;
} }
std::unique_ptr<SevenZipArchiveReader::FileEnumerator> SevenZipArchiveReader::nextFile([[maybe_unused]] std::unique_ptr<ReadBuffer> read_buffer) { std::unique_ptr<SevenZipArchiveReader::FileEnumerator>
SevenZipArchiveReader::nextFile([[maybe_unused]] std::unique_ptr<ReadBuffer> read_buffer)
{
return nullptr; return nullptr;
} }
void SevenZipArchiveReader::setPassword([[maybe_unused]] const String & password_) { void SevenZipArchiveReader::setPassword([[maybe_unused]] const String & password_)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to .7z archive"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to .7z archive");
} }

View File

@ -2,9 +2,9 @@
#include <IO/Archives/IArchiveReader.h> #include <IO/Archives/IArchiveReader.h>
#include <iostream>
#include <archive.h> #include <archive.h>
#include <archive_entry.h> #include <archive_entry.h>
#include <iostream>
namespace DB namespace DB
@ -17,7 +17,6 @@ class SeekableReadBuffer;
class SevenZipArchiveReader : public IArchiveReader class SevenZipArchiveReader : public IArchiveReader
{ {
public: public:
/// Constructs an archive's reader that will read from a file in the local filesystem. /// Constructs an archive's reader that will read from a file in the local filesystem.
explicit SevenZipArchiveReader(const String & path_to_archive_); explicit SevenZipArchiveReader(const String & path_to_archive_);
@ -49,7 +48,6 @@ public:
void setPassword([[maybe_unused]] const String & password_) override; void setPassword([[maybe_unused]] const String & password_) override;
private: private:
class ReadBufferFromSevenZipArchive; class ReadBufferFromSevenZipArchive;
class Handle; class Handle;

View File

@ -3,31 +3,36 @@
#include <Common/quoteString.h> #include <Common/quoteString.h>
namespace DB{ namespace DB
{
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_UNPACK_ARCHIVE; extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int SEEK_POSITION_OUT_OF_BOUND;
} }
class TarArchiveReader::Handle { class TarArchiveReader::Handle
{
public: public:
Handle(const String & path_to_archive_) Handle(const String & path_to_archive_) : path_to_archive(path_to_archive_)
: path_to_archive(path_to_archive_) { {
archive = archive_read_new(); archive = archive_read_new();
archive_read_support_filter_all(archive); archive_read_support_filter_all(archive);
archive_read_support_format_all(archive); archive_read_support_format_all(archive);
if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK) { if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK)
{
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open tar archive {}", quoteString(path_to_archive)); throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open tar archive {}", quoteString(path_to_archive));
} }
entry = archive_entry_new(); entry = archive_entry_new();
} }
~Handle() { ~Handle()
{
archive_read_close(archive); archive_read_close(archive);
archive_read_free(archive); archive_read_free(archive);
} }
bool locateFile(const String &filename) { bool locateFile(const String & filename)
{
while (archive_read_next_header(archive, &entry) == ARCHIVE_OK) while (archive_read_next_header(archive, &entry) == ARCHIVE_OK)
{ {
if (archive_entry_pathname(entry) == filename) if (archive_entry_pathname(entry) == filename)
@ -47,7 +52,10 @@ class TarArchiveReader::ReadBufferFromTarArchive : public ReadBufferFromFileBase
{ {
public: public:
explicit ReadBufferFromTarArchive(const String & path_to_archive_, const String & filename_) explicit ReadBufferFromTarArchive(const String & path_to_archive_, const String & filename_)
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0), handle(path_to_archive_), path_to_archive(path_to_archive_), filename(filename_) : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(path_to_archive_)
, path_to_archive(path_to_archive_)
, filename(filename_)
{ {
handle.locateFile(filename_); handle.locateFile(filename_);
} }
@ -87,10 +95,7 @@ public:
return new_pos; return new_pos;
} }
off_t getPosition() override off_t getPosition() override { return archive_entry_size(handle.entry) - available(); }
{
return archive_entry_size(handle.entry) - available();
}
String getFileName() const override { return filename; } String getFileName() const override { return filename; }
@ -112,20 +117,27 @@ private:
const String filename; const String filename;
}; };
TarArchiveReader::TarArchiveReader(const String & path_to_archive_) TarArchiveReader::TarArchiveReader(const String & path_to_archive_) : path_to_archive(path_to_archive_)
: path_to_archive(path_to_archive_) {} {
}
TarArchiveReader::TarArchiveReader(const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_): path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), archive_size(archive_size_) {} TarArchiveReader::TarArchiveReader(
const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_)
: path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), archive_size(archive_size_)
{
}
TarArchiveReader::~TarArchiveReader() {} TarArchiveReader::~TarArchiveReader()
{
}
bool TarArchiveReader::fileExists(const String & filename) bool TarArchiveReader::fileExists(const String & filename)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive);
return handle.locateFile(filename); return handle.locateFile(filename);
} }
TarArchiveReader::FileInfo TarArchiveReader::getFileInfo(const String & filename) { TarArchiveReader::FileInfo TarArchiveReader::getFileInfo(const String & filename)
{
Handle handle(path_to_archive); Handle handle(path_to_archive);
handle.locateFile(filename); handle.locateFile(filename);
@ -137,28 +149,32 @@ private:
return info; return info;
} }
std::unique_ptr<TarArchiveReader::FileEnumerator> TarArchiveReader::firstFile() { std::unique_ptr<TarArchiveReader::FileEnumerator> TarArchiveReader::firstFile()
{
return nullptr; return nullptr;
} }
std::unique_ptr<ReadBufferFromFileBase> TarArchiveReader::readFile(const String & filename) { std::unique_ptr<ReadBufferFromFileBase> TarArchiveReader::readFile(const String & filename)
{
Handle handle(path_to_archive); Handle handle(path_to_archive);
handle.locateFile(filename); handle.locateFile(filename);
return std::make_unique<ReadBufferFromTarArchive>(path_to_archive, filename); return std::make_unique<ReadBufferFromTarArchive>(path_to_archive, filename);
} }
std::unique_ptr<ReadBufferFromFileBase> TarArchiveReader::readFile([[maybe_unused]] std::unique_ptr<FileEnumerator> enumerator) { std::unique_ptr<ReadBufferFromFileBase> TarArchiveReader::readFile([[maybe_unused]] std::unique_ptr<FileEnumerator> enumerator)
{
return nullptr; return nullptr;
} }
std::unique_ptr<TarArchiveReader::FileEnumerator> TarArchiveReader::nextFile([[maybe_unused]] std::unique_ptr<ReadBuffer> read_buffer) { std::unique_ptr<TarArchiveReader::FileEnumerator> TarArchiveReader::nextFile([[maybe_unused]] std::unique_ptr<ReadBuffer> read_buffer)
{
return nullptr; return nullptr;
} }
void TarArchiveReader::setPassword([[maybe_unused]] const String & password_) { void TarArchiveReader::setPassword([[maybe_unused]] const String & password_)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to .tar archive"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to .tar archive");
} }

View File

@ -47,7 +47,6 @@ public:
void setPassword([[maybe_unused]] const String & password_) override; void setPassword([[maybe_unused]] const String & password_) override;
private: private:
class ReadBufferFromTarArchive; class ReadBufferFromTarArchive;
class Handle; class Handle;

View File

@ -1,11 +1,11 @@
#include <Storages/StorageFile.h>
#include <Storages/StorageFactory.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/PartitionedSink.h>
#include <Storages/Distributed/DirectoryMonitor.h> #include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/checkAndGetLiteralArgument.h> #include <Storages/PartitionedSink.h>
#include <Storages/ReadFromStorageProgress.h> #include <Storages/ReadFromStorageProgress.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
@ -28,30 +28,30 @@
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h> #include <Formats/ReadSchemaUtils.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/ISource.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Processors/ISource.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/parseGlobs.h>
#include <Common/filesystemHelpers.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
#include <Common/filesystemHelpers.h>
#include <Common/parseGlobs.h>
#include <Common/typeid_cast.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <sys/stat.h> #include <filesystem>
#include <shared_mutex>
#include <fcntl.h> #include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <re2/re2.h> #include <re2/re2.h>
#include <filesystem> #include <sys/stat.h>
#include <shared_mutex>
namespace ProfileEvents namespace ProfileEvents
@ -110,8 +110,7 @@ void listFilesWithRegexpMatchingImpl(
re2::RE2 matcher(regexp); re2::RE2 matcher(regexp);
if (!matcher.ok()) if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", for_match, matcher.error());
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
bool skip_regex = current_glob == "/*" ? true : false; bool skip_regex = current_glob == "/*" ? true : false;
if (!recursive) if (!recursive)
@ -143,23 +142,25 @@ void listFilesWithRegexpMatchingImpl(
{ {
if (recursive) if (recursive)
{ {
listFilesWithRegexpMatchingImpl(fs::path(full_path).append(it->path().string()) / "" , listFilesWithRegexpMatchingImpl(
fs::path(full_path).append(it->path().string()) / "",
looking_for_directory ? suffix_with_globs.substr(next_slash) : current_glob, looking_for_directory ? suffix_with_globs.substr(next_slash) : current_glob,
total_bytes_to_read, result, recursive); total_bytes_to_read,
result,
recursive);
} }
else if (looking_for_directory && re2::RE2::FullMatch(file_name, matcher)) else if (looking_for_directory && re2::RE2::FullMatch(file_name, matcher))
{ {
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check. /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash), total_bytes_to_read, result); listFilesWithRegexpMatchingImpl(
fs::path(full_path) / "", suffix_with_globs.substr(next_slash), total_bytes_to_read, result);
} }
} }
} }
} }
std::vector<std::string> listFilesWithRegexpMatching( std::vector<std::string>
const std::string & path_for_ls, listFilesWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match, size_t & total_bytes_to_read)
const std::string & for_match,
size_t & total_bytes_to_read)
{ {
std::vector<std::string> result; std::vector<std::string> result;
listFilesWithRegexpMatchingImpl(path_for_ls, for_match, total_bytes_to_read, result); listFilesWithRegexpMatchingImpl(path_for_ls, for_match, total_bytes_to_read, result);
@ -173,10 +174,7 @@ std::string getTablePath(const std::string & table_dir_path, const std::string &
/// Both db_dir_path and table_path must be converted to absolute paths (in particular, path cannot contain '..'). /// Both db_dir_path and table_path must be converted to absolute paths (in particular, path cannot contain '..').
void checkCreationIsAllowed( void checkCreationIsAllowed(
ContextPtr context_global, ContextPtr context_global, const std::string & db_dir_path, const std::string & table_path, bool can_be_directory)
const std::string & db_dir_path,
const std::string & table_path,
bool can_be_directory)
{ {
if (context_global->getApplicationType() != Context::ApplicationType::SERVER) if (context_global->getApplicationType() != Context::ApplicationType::SERVER)
return; return;
@ -193,12 +191,8 @@ void checkCreationIsAllowed(
} }
} }
std::unique_ptr<ReadBuffer> selectReadBuffer( std::unique_ptr<ReadBuffer>
const String & current_path, selectReadBuffer(const String & current_path, bool use_table_fd, int table_fd, const struct stat & file_stat, ContextPtr context)
bool use_table_fd,
int table_fd,
const struct stat & file_stat,
ContextPtr context)
{ {
auto read_method = context->getSettingsRef().storage_file_read_method; auto read_method = context->getSettingsRef().storage_file_read_method;
@ -255,8 +249,11 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
{ {
CompressionMethod method; CompressionMethod method;
struct stat file_stat{}; struct stat file_stat
if (path_to_archive != "auto") { {
};
if (path_to_archive != "auto")
{
auto reader = createArchiveReader(path_to_archive); auto reader = createArchiveReader(path_to_archive);
std::unique_ptr<ReadBuffer> in = reader->readFile(current_path); std::unique_ptr<ReadBuffer> in = reader->readFile(current_path);
return in; return in;
@ -295,7 +292,8 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
} }
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read) Strings
StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read)
{ {
fs::path user_files_absolute_path = fs::weakly_canonical(user_files_path); fs::path user_files_absolute_path = fs::weakly_canonical(user_files_path);
fs::path fs_table_path(table_path); fs::path fs_table_path(table_path);
@ -385,14 +383,16 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
throw Exception( throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path. " "Cannot extract table structure from {} format file, because there are no files with provided path. "
"You must specify table structure manually", format); "You must specify table structure manually",
format);
std::optional<ColumnsDescription> columns_from_cache; std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_file) if (context->getSettingsRef().schema_inference_use_cache_for_file)
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context); columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
ReadBufferIterator read_buffer_iterator; ReadBufferIterator read_buffer_iterator;
if (paths_to_archive.empty()) { if (paths_to_archive.empty())
{
read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer> read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == paths.end()) if (it == paths.end())
@ -400,7 +400,9 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return createReadBuffer(*it++, false, "File", -1, compression_method, context); return createReadBuffer(*it++, false, "File", -1, compression_method, context);
}; };
} else { }
else
{
read_buffer_iterator = [&, it = paths_to_archive.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer> read_buffer_iterator = [&, it = paths_to_archive.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == paths_to_archive.end()) if (it == paths_to_archive.end())
@ -426,8 +428,7 @@ bool StorageFile::supportsSubsetOfColumns() const
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name); return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
} }
StorageFile::StorageFile(int table_fd_, CommonArguments args) StorageFile::StorageFile(int table_fd_, CommonArguments args) : StorageFile(args)
: StorageFile(args)
{ {
struct stat buf; struct stat buf;
int res = fstat(table_fd_, &buf); int res = fstat(table_fd_, &buf);
@ -446,13 +447,15 @@ StorageFile::StorageFile(int table_fd_, CommonArguments args)
setStorageMetadata(args); setStorageMetadata(args);
} }
StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args) StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args) : StorageFile(args)
: StorageFile(args) {
if (args.path_to_archive != "auto")
{ {
if (args.path_to_archive != "auto") {
paths_to_archive = getPathsList(args.path_to_archive, user_files_path, args.getContext(), total_bytes_to_read); paths_to_archive = getPathsList(args.path_to_archive, user_files_path, args.getContext(), total_bytes_to_read);
paths = {table_path_}; paths = {table_path_};
} else { }
else
{
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read); paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
} }
is_db_table = false; is_db_table = false;
@ -465,8 +468,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
setStorageMetadata(args); setStorageMetadata(args);
} }
StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArguments args) StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArguments args) : StorageFile(args)
: StorageFile(args)
{ {
if (relative_table_dir_path.empty()) if (relative_table_dir_path.empty())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Storage {} requires data path", getName()); throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Storage {} requires data path", getName());
@ -507,7 +509,8 @@ void StorageFile::setStorageMetadata(CommonArguments args)
columns = getTableStructureFromFileDescriptor(args.getContext()); columns = getTableStructureFromFileDescriptor(args.getContext());
else else
{ {
columns = getTableStructureFromFile(format_name, paths, compression_method, format_settings, args.getContext(), paths_to_archive); columns
= getTableStructureFromFile(format_name, paths, compression_method, format_settings, args.getContext(), paths_to_archive);
if (!args.columns.empty() && args.columns != columns) if (!args.columns.empty() && args.columns != columns)
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Table structure and file structure are different"); throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Table structure and file structure are different");
} }
@ -600,10 +603,7 @@ public:
} }
} }
String getName() const override String getName() const override { return storage->getName(); }
{
return storage->getName();
}
Chunk generate() override Chunk generate() override
{ {
@ -615,13 +615,16 @@ public:
if (!storage->use_table_fd) if (!storage->use_table_fd)
{ {
size_t current_file = 0, current_archive = 0; size_t current_file = 0, current_archive = 0;
if (files_info->files.size() == 1 && !files_info->paths_to_archive.empty()) { if (files_info->files.size() == 1 && !files_info->paths_to_archive.empty())
{
current_archive = files_info->next_archive_to_read.fetch_add(1); current_archive = files_info->next_archive_to_read.fetch_add(1);
if (current_archive >= files_info->paths_to_archive.size()) if (current_archive >= files_info->paths_to_archive.size())
return {}; return {};
current_path = files_info->files[current_file]; current_path = files_info->files[current_file];
current_archive_path = files_info->paths_to_archive[current_archive]; current_archive_path = files_info->paths_to_archive[current_archive];
} else { }
else
{
current_file = files_info->next_file_to_read.fetch_add(1); current_file = files_info->next_file_to_read.fetch_add(1);
if (current_file >= files_info->files.size()) if (current_file >= files_info->files.size())
return {}; return {};
@ -636,11 +639,28 @@ public:
} }
} }
if (!read_buf) { if (!read_buf)
if (files_info->paths_to_archive.empty()) { {
read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->getName(), storage->table_fd, storage->compression_method, context); if (files_info->paths_to_archive.empty())
} else { {
read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->getName(), storage->table_fd, storage->compression_method, context, current_archive_path); read_buf = createReadBuffer(
current_path,
storage->use_table_fd,
storage->getName(),
storage->table_fd,
storage->compression_method,
context);
}
else
{
read_buf = createReadBuffer(
current_path,
storage->use_table_fd,
storage->getName(),
storage->table_fd,
storage->compression_method,
context,
current_archive_path);
} }
} }
auto format auto format
@ -651,10 +671,9 @@ public:
if (columns_description.hasDefaults()) if (columns_description.hasDefaults())
{ {
builder.addSimpleTransform([&](const Block & header) builder.addSimpleTransform(
{ [&](const Block & header)
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *format, context); { return std::make_shared<AddingDefaultsTransform>(header, columns_description, *format, context); });
});
} }
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder))); pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
@ -679,14 +698,20 @@ public:
size_t last_slash_pos = current_path.find_last_of('/'); size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1); auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name)); auto column
= DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
chunk.addColumn(column->convertToFullColumnIfConst()); chunk.addColumn(column->convertToFullColumnIfConst());
} }
if (num_rows) if (num_rows)
{ {
updateRowsProgressApprox( updateRowsProgressApprox(
*this, chunk, files_info->total_bytes_to_read, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max); *this,
chunk,
files_info->total_bytes_to_read,
total_rows_approx_accumulated,
total_rows_count_times,
total_rows_approx_max);
} }
return chunk; return chunk;
} }
@ -793,14 +818,14 @@ Pipe StorageFile::read(
const auto & virtuals = getVirtuals(); const auto & virtuals = getVirtuals();
std::erase_if( std::erase_if(
fetch_columns, fetch_columns,
[&](const String & col) [&](const String & col) {
{
return std::any_of( return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; }); virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
}); });
if (fetch_columns.empty()) if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name); fetch_columns.push_back(
ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns); columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
} }
else else
@ -914,8 +939,8 @@ public:
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3); write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name, writer = FormatFactory::instance().getOutputFormatParallelIfPossible(
*write_buf, metadata_snapshot->getSampleBlock(), context, format_settings); format_name, *write_buf, metadata_snapshot->getSampleBlock(), context, format_settings);
if (do_not_write_prefix) if (do_not_write_prefix)
writer->doNotWritePrefix(); writer->doNotWritePrefix();
@ -1056,10 +1081,7 @@ private:
}; };
SinkToStoragePtr StorageFile::write( SinkToStoragePtr StorageFile::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{ {
if (format_name == "Distributed") if (format_name == "Distributed")
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not implemented for Distributed format"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not implemented for Distributed format");
@ -1099,7 +1121,8 @@ SinkToStoragePtr StorageFile::write(
if (!paths.empty()) if (!paths.empty())
{ {
if (is_path_with_globs) if (is_path_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"Table '{}' is in readonly mode because of globs in filepath", "Table '{}' is in readonly mode because of globs in filepath",
getStorageID().getNameForLogs()); getStorageID().getNameForLogs());
@ -1118,10 +1141,10 @@ SinkToStoragePtr StorageFile::write(
String new_path; String new_path;
do do
{ {
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos)); new_path = paths[0].substr(0, pos) + "." + std::to_string(index)
+ (pos == std::string::npos ? "" : paths[0].substr(pos));
++index; ++index;
} } while (fs::exists(new_path));
while (fs::exists(new_path));
paths.push_back(new_path); paths.push_back(new_path);
path = new_path; path = new_path;
} }
@ -1166,8 +1189,10 @@ Strings StorageFile::getDataPaths() const
void StorageFile::rename(const String & new_path_to_table_data, const StorageID & new_table_id) void StorageFile::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
{ {
if (!is_db_table) if (!is_db_table)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, throw Exception(
"Can't rename table {} bounded to user-defined file (or FD)", getStorageID().getNameForLogs()); ErrorCodes::DATABASE_ACCESS_DENIED,
"Can't rename table {} bounded to user-defined file (or FD)",
getStorageID().getNameForLogs());
if (paths.size() != 1) if (paths.size() != 1)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Can't rename table {} in readonly mode", getStorageID().getNameForLogs()); throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Can't rename table {} in readonly mode", getStorageID().getNameForLogs());
@ -1184,10 +1209,7 @@ void StorageFile::rename(const String & new_path_to_table_data, const StorageID
} }
void StorageFile::truncate( void StorageFile::truncate(
const ASTPtr & /*query*/, const ASTPtr & /*query*/, const StorageMetadataPtr & /* metadata_snapshot */, ContextPtr /* context */, TableExclusiveLockHolder &)
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
TableExclusiveLockHolder &)
{ {
if (is_path_with_globs) if (is_path_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Can't truncate table '{}' in readonly mode", getStorageID().getNameForLogs()); throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Can't truncate table '{}' in readonly mode", getStorageID().getNameForLogs());
@ -1223,8 +1245,7 @@ void registerStorageFile(StorageFactory & factory)
"File", "File",
[](const StorageFactory::Arguments & factory_args) [](const StorageFactory::Arguments & factory_args)
{ {
StorageFile::CommonArguments storage_args StorageFile::CommonArguments storage_args{
{
WithContext(factory_args.getContext()), WithContext(factory_args.getContext()),
factory_args.table_id, factory_args.table_id,
{}, {},
@ -1238,7 +1259,8 @@ void registerStorageFile(StorageFactory & factory)
ASTs & engine_args_ast = factory_args.engine_args; ASTs & engine_args_ast = factory_args.engine_args;
if (!(engine_args_ast.size() >= 1 && engine_args_ast.size() <= 3)) // NOLINT if (!(engine_args_ast.size() >= 1 && engine_args_ast.size() <= 3)) // NOLINT
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage File requires from 1 to 3 arguments: " "Storage File requires from 1 to 3 arguments: "
"name of used format, source and compression_method."); "name of used format, source and compression_method.");
@ -1264,16 +1286,13 @@ void registerStorageFile(StorageFactory & factory)
} }
// Apply changes from SETTINGS clause, with validation. // Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges( user_format_settings.applyChanges(factory_args.storage_def->settings->changes);
factory_args.storage_def->settings->changes);
storage_args.format_settings = getFormatSettings( storage_args.format_settings = getFormatSettings(factory_args.getContext(), user_format_settings);
factory_args.getContext(), user_format_settings);
} }
else else
{ {
storage_args.format_settings = getFormatSettings( storage_args.format_settings = getFormatSettings(factory_args.getContext());
factory_args.getContext());
} }
if (engine_args_ast.size() == 1) /// Table in database if (engine_args_ast.size() == 1) /// Table in database
@ -1292,8 +1311,8 @@ void registerStorageFile(StorageFactory & factory)
else if (*opt_name == "stderr") else if (*opt_name == "stderr")
source_fd = STDERR_FILENO; source_fd = STDERR_FILENO;
else else
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown identifier '{}' in second arg of File storage constructor", throw Exception(
*opt_name); ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown identifier '{}' in second arg of File storage constructor", *opt_name);
} }
else if (const auto * literal = engine_args_ast[1]->as<ASTLiteral>()) else if (const auto * literal = engine_args_ast[1]->as<ASTLiteral>())
{ {
@ -1334,7 +1353,8 @@ NamesAndTypesList StorageFile::getVirtuals() const
SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context) SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
{ {
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS)); static SchemaCache schema_cache(
context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache; return schema_cache;
} }
@ -1343,7 +1363,9 @@ std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
{ {
/// Check if the cache contains one of the paths. /// Check if the cache contains one of the paths.
auto & schema_cache = getSchemaCache(context); auto & schema_cache = getSchemaCache(context);
struct stat file_stat{}; struct stat file_stat
{
};
for (const auto & path : paths) for (const auto & path : paths)
{ {
auto get_last_mod_time = [&]() -> std::optional<time_t> auto get_last_mod_time = [&]() -> std::optional<time_t>

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Storages/IStorage.h>
#include <Storages/Cache/SchemaCache.h> #include <Storages/Cache/SchemaCache.h>
#include <Storages/IStorage.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -51,10 +51,7 @@ public:
size_t max_block_size, size_t max_block_size,
size_t num_streams) override; size_t num_streams) override;
SinkToStoragePtr write( SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr context) override;
void truncate( void truncate(
const ASTPtr & /*query*/, const ASTPtr & /*query*/,
@ -69,7 +66,8 @@ public:
NamesAndTypesList getVirtuals() const override; NamesAndTypesList getVirtuals() const override;
static Strings getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read); static Strings
getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read);
/// Check if the format supports reading only some subset of columns. /// Check if the format supports reading only some subset of columns.
/// Is is useful because such formats could effectively skip unknown columns /// Is is useful because such formats could effectively skip unknown columns