ClickHouse/src/Storages/StorageFile.cpp

879 lines
33 KiB
C++
Raw Normal View History

#include <Storages/StorageFile.h>
#include <Storages/StorageFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <IO/ReadBufferFromFile.h>
2019-11-20 14:48:01 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
2019-07-21 13:15:04 +00:00
#include <Common/parseGlobs.h>
2021-03-30 21:28:23 +00:00
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <fcntl.h>
2020-01-05 02:57:09 +00:00
#include <unistd.h>
#include <Poco/Path.h>
#include <Poco/File.h>
2019-07-21 13:15:04 +00:00
#include <re2/re2.h>
2019-08-27 15:20:31 +00:00
#include <filesystem>
2020-01-04 14:45:11 +00:00
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Processors/Sources/SourceWithProgress.h>
2020-05-18 10:00:22 +00:00
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>
2021-04-26 13:34:44 +00:00
#include <Common/UnicodeBar.h>
#include <Common/TerminalSize.h>
2019-07-21 13:15:04 +00:00
2019-08-27 15:20:31 +00:00
namespace fs = std::filesystem;
2019-07-21 13:15:04 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_SEEK_THROUGH_FILE;
2020-01-05 02:57:09 +00:00
extern const int CANNOT_TRUNCATE_FILE;
extern const int DATABASE_ACCESS_DENIED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_IDENTIFIER;
extern const int INCORRECT_FILE_NAME;
extern const int FILE_DOESNT_EXIST;
extern const int TIMEOUT_EXCEEDED;
extern const int INCOMPATIBLE_COLUMNS;
}
2019-08-02 15:00:12 +00:00
namespace
{
2019-09-22 22:13:42 +00:00
2019-08-10 16:00:01 +00:00
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
2021-04-26 13:34:44 +00:00
std::vector<std::string> listFilesWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match, size_t & total_bytes_to_read)
2019-08-02 15:00:12 +00:00
{
2019-08-30 15:19:05 +00:00
const size_t first_glob = for_match.find_first_of("*?{");
2019-08-05 23:10:19 +00:00
2019-08-30 15:19:05 +00:00
const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
2019-08-08 14:26:02 +00:00
2019-08-30 15:19:05 +00:00
const size_t next_slash = suffix_with_globs.find('/', 1);
2020-01-05 20:11:26 +00:00
auto regexp = makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash));
re2::RE2 matcher(regexp);
2019-08-05 23:10:19 +00:00
2019-08-08 14:26:02 +00:00
std::vector<std::string> result;
2019-08-30 15:19:05 +00:00
const std::string prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs);
2020-01-05 20:11:26 +00:00
if (!fs::exists(fs::path(prefix_without_globs)))
2019-08-10 16:00:01 +00:00
{
return result;
}
2019-08-30 15:19:05 +00:00
const fs::directory_iterator end;
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
2019-08-02 15:00:12 +00:00
{
2019-08-30 15:19:05 +00:00
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
2019-08-10 16:00:01 +00:00
/// Condition is_directory means what kind of path is it in current iteration of ls
2019-08-30 15:19:05 +00:00
if (!fs::is_directory(it->path()) && !looking_for_directory)
2019-08-02 15:00:12 +00:00
{
2019-08-08 14:26:02 +00:00
if (re2::RE2::FullMatch(file_name, matcher))
2019-08-02 15:00:12 +00:00
{
2021-04-26 13:34:44 +00:00
total_bytes_to_read += fs::file_size(it->path());
2019-08-02 15:00:12 +00:00
result.push_back(it->path().string());
}
}
2019-08-30 15:19:05 +00:00
else if (fs::is_directory(it->path()) && looking_for_directory)
2019-08-02 15:00:12 +00:00
{
2019-08-08 14:26:02 +00:00
if (re2::RE2::FullMatch(file_name, matcher))
2019-08-02 15:00:12 +00:00
{
2019-09-23 14:50:33 +00:00
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
2021-04-26 13:34:44 +00:00
Strings result_part = listFilesWithRegexpMatching(full_path + "/", suffix_with_globs.substr(next_slash), total_bytes_to_read);
2019-08-02 15:00:12 +00:00
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
}
return result;
}
2020-03-09 01:03:43 +00:00
std::string getTablePath(const std::string & table_dir_path, const std::string & format_name)
{
2019-10-25 19:07:47 +00:00
return table_dir_path + "/data." + escapeForFileName(format_name);
}
2018-04-19 04:39:16 +00:00
/// Both db_dir_path and table_path must be converted to absolute paths (in particular, path cannot contain '..').
void checkCreationIsAllowed(ContextPtr context_global, const std::string & db_dir_path, const std::string & table_path)
{
if (context_global->getApplicationType() != Context::ApplicationType::SERVER)
return;
2019-12-29 07:03:39 +00:00
/// "/dev/null" is allowed for perf testing
if (!startsWith(table_path, db_dir_path) && table_path != "/dev/null")
2020-01-05 20:11:26 +00:00
throw Exception("File is not inside " + db_dir_path, ErrorCodes::DATABASE_ACCESS_DENIED);
Poco::File table_path_poco_file = Poco::File(table_path);
if (table_path_poco_file.exists() && table_path_poco_file.isDirectory())
2020-01-05 20:11:26 +00:00
throw Exception("File must not be a directory", ErrorCodes::INCORRECT_FILE_NAME);
}
2019-09-06 18:29:41 +00:00
}
2021-04-26 13:34:44 +00:00
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read)
{
String user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
Poco::Path poco_path = Poco::Path(table_path);
if (poco_path.isRelative())
poco_path = Poco::Path(user_files_absolute_path, poco_path);
Strings paths;
const String path = poco_path.absolute().toString();
if (path.find_first_of("*?{") == std::string::npos)
2021-04-26 13:34:44 +00:00
{
total_bytes_to_read += fs::file_size(path);
paths.push_back(path);
2021-04-26 13:34:44 +00:00
}
else
2021-04-26 13:34:44 +00:00
paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read);
for (const auto & cur_path : paths)
checkCreationIsAllowed(context, user_files_absolute_path, cur_path);
return paths;
}
2021-03-31 14:21:19 +00:00
bool StorageFile::isColumnOriented() const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatIsColumnOriented(format_name);
}
2019-10-30 14:17:55 +00:00
StorageFile::StorageFile(int table_fd_, CommonArguments args)
: StorageFile(args)
{
if (args.getContext()->getApplicationType() == Context::ApplicationType::SERVER)
2019-10-30 14:17:55 +00:00
throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
if (args.format_name == "Distributed")
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
2019-08-24 21:20:20 +00:00
2019-10-30 14:17:55 +00:00
is_db_table = false;
use_table_fd = true;
table_fd = table_fd_;
2019-10-30 14:17:55 +00:00
/// Save initial offset, it will be used for repeating SELECTs
/// If FD isn't seekable (lseek returns -1), then the second and subsequent SELECTs will fail.
table_fd_init_offset = lseek(table_fd, 0, SEEK_CUR);
}
StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args)
2019-10-30 14:17:55 +00:00
: StorageFile(args)
{
is_db_table = false;
2021-04-26 13:34:44 +00:00
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
2020-01-04 18:05:42 +00:00
if (args.format_name == "Distributed")
{
if (paths.empty())
throw Exception("Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME);
auto & first_path = paths[0];
Block header = StorageDistributedDirectoryMonitor::createStreamFromFile(first_path)->getHeader();
StorageInMemoryMetadata storage_metadata;
auto columns = ColumnsDescription(header.getNamesAndTypesList());
if (!args.columns.empty() && columns != args.columns)
throw Exception("Table structure and file structure are different", ErrorCodes::INCOMPATIBLE_COLUMNS);
storage_metadata.setColumns(columns);
setInMemoryMetadata(storage_metadata);
2020-01-04 18:05:42 +00:00
}
2019-10-30 14:17:55 +00:00
}
2019-10-30 14:17:55 +00:00
StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArguments args)
: StorageFile(args)
{
if (relative_table_dir_path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
if (args.format_name == "Distributed")
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
String table_dir_path = base_path + relative_table_dir_path + "/";
2019-10-30 14:17:55 +00:00
Poco::File(table_dir_path).createDirectories();
paths = {getTablePath(table_dir_path, format_name)};
}
2019-10-30 14:17:55 +00:00
StorageFile::StorageFile(CommonArguments args)
2020-04-27 13:55:30 +00:00
: IStorage(args.table_id)
2019-12-04 16:06:55 +00:00
, format_name(args.format_name)
, format_settings(args.format_settings)
2019-12-04 16:06:55 +00:00
, compression_method(args.compression_method)
, base_path(args.getContext()->getPath())
2019-10-30 14:17:55 +00:00
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
2020-01-04 18:37:31 +00:00
if (args.format_name != "Distributed")
2020-06-19 15:39:41 +00:00
storage_metadata.setColumns(args.columns);
2020-01-04 18:37:31 +00:00
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(args.constraints);
setInMemoryMetadata(storage_metadata);
2019-10-30 14:17:55 +00:00
}
static std::chrono::seconds getLockTimeout(ContextPtr context)
{
const Settings & settings = context->getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
2021-03-30 17:57:21 +00:00
using StorageFilePtr = std::shared_ptr<StorageFile>;
class StorageFileSource : public SourceWithProgress
{
public:
struct FilesInfo
{
std::vector<std::string> files;
std::atomic<size_t> next_file_to_read = 0;
bool need_path_column = false;
bool need_file_column = false;
};
using FilesInfoPtr = std::shared_ptr<FilesInfo>;
static Block getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
{
auto header = metadata_snapshot->getSampleBlock();
/// Note: AddingDefaultsBlockInputStream doesn't change header.
if (need_path_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (need_file_column)
header.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
return header;
}
2021-03-30 17:57:21 +00:00
static Block getBlockForSource(
const StorageFilePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
const ColumnsDescription & columns_description,
const FilesInfoPtr & files_info)
{
2021-03-31 14:21:19 +00:00
if (storage->isColumnOriented())
2021-03-30 17:57:21 +00:00
return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID());
2021-03-30 21:25:37 +00:00
else
2021-03-30 17:57:21 +00:00
return getHeader(metadata_snapshot, files_info->need_path_column, files_info->need_file_column);
}
StorageFileSource(
std::shared_ptr<StorageFile> storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
2020-10-02 12:38:50 +00:00
ColumnsDescription columns_description_)
2021-03-30 17:57:21 +00:00
: SourceWithProgress(getBlockForSource(storage_, metadata_snapshot_, columns_description_, files_info_))
, storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, files_info(std::move(files_info_))
2020-10-02 12:38:50 +00:00
, columns_description(std::move(columns_description_))
, context(context_)
, max_block_size(max_block_size_)
{
if (storage->use_table_fd)
{
unique_lock = std::unique_lock(storage->rwlock, getLockTimeout(context));
if (!unique_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
/// and add ability to seek unseekable files, but cache sync isn't supported.
if (storage->table_fd_was_used) /// We need seek to initial position
{
if (storage->table_fd_init_offset < 0)
throw Exception("File descriptor isn't seekable, inside " + storage->getName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
2019-01-22 19:56:53 +00:00
/// ReadBuffer's seek() doesn't make sense, since cache is empty
if (lseek(storage->table_fd, storage->table_fd_init_offset, SEEK_SET) < 0)
throwFromErrno("Cannot seek file descriptor, inside " + storage->getName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
storage->table_fd_was_used = true;
}
else
{
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context));
if (!shared_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
String getName() const override
{
return storage->getName();
}
Chunk generate() override
{
while (!finished_generate)
{
/// Open file lazily on first read. This is needed to avoid too many open files from different streams.
if (!reader)
{
if (!storage->use_table_fd)
{
auto current_file = files_info->next_file_to_read.fetch_add(1);
if (current_file >= files_info->files.size())
return {};
current_path = files_info->files[current_file];
/// Special case for distributed format. Defaults are not needed here.
if (storage->format_name == "Distributed")
{
reader = StorageDistributedDirectoryMonitor::createStreamFromFile(current_path);
continue;
}
}
std::unique_ptr<ReadBuffer> nested_buffer;
CompressionMethod method;
if (storage->use_table_fd)
{
nested_buffer = std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd);
method = chooseCompressionMethod("", storage->compression_method);
}
else
{
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path);
method = chooseCompressionMethod(current_path, storage->compression_method);
}
2020-08-03 17:38:11 +00:00
read_buf = wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
2021-03-30 17:57:21 +00:00
auto get_block_for_format = [&]() -> Block
{
2021-03-31 14:21:19 +00:00
if (storage->isColumnOriented())
2021-03-30 17:57:21 +00:00
return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
return metadata_snapshot->getSampleBlock();
};
auto format = FormatFactory::instance().getInput(
storage->format_name, *read_buf, get_block_for_format(), context, max_block_size, storage->format_settings);
2020-05-18 10:00:22 +00:00
reader = std::make_shared<InputStreamFromInputFormat>(format);
2020-10-02 12:38:50 +00:00
if (columns_description.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_description, context);
reader->readPrefix();
}
if (auto res = reader->read())
{
Columns columns = res.getColumns();
UInt64 num_rows = res.rows();
/// Enrich with virtual columns.
if (files_info->need_path_column)
{
auto column = DataTypeString().createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
if (files_info->need_file_column)
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeString().createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
return Chunk(std::move(columns), num_rows);
}
/// Read only once for file descriptor.
if (storage->use_table_fd)
finished_generate = true;
/// Close file prematurely if stream was ended.
reader->readSuffix();
reader.reset();
read_buf.reset();
}
return {};
}
2021-04-26 13:34:44 +00:00
void setProgressCallback(const ProgressCallback & callback) override
{
/// Add file progress callback only for clickhouse-local.
if (context->getApplicationType() != Context::ApplicationType::LOCAL)
{
progress_callback = callback;
return;
}
auto file_progress_callback = [this](const Progress & progress)
{
static size_t increment = 0;
static const char * indicators[8] =
{
"\033[1;30m→\033[0m",
"\033[1;31m↘\033[0m",
"\033[1;32m↓\033[0m",
"\033[1;33m↙\033[0m",
"\033[1;34m←\033[0m",
"\033[1;35m↖\033[0m",
"\033[1;36m↑\033[0m",
"\033[1m↗\033[0m",
};
size_t terminal_width = getTerminalWidth();
auto & file_progress = context->getFileTableEngineProgress();
WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024);
if (!file_progress.processed_bytes)
message << std::string(terminal_width, ' ');
file_progress.processed_bytes += progress.read_bytes;
file_progress.processed_rows += progress.read_rows;
/// Display progress bar only if .25 seconds have passed since query execution start.
size_t elapsed_ns = file_progress.watch.elapsed();
2021-04-26 22:38:56 +00:00
if (elapsed_ns > 25000000)
2021-04-26 13:34:44 +00:00
{
message << '\r';
const char * indicator = indicators[increment % 8];
size_t prefix_size = message.count();
2021-04-26 22:38:56 +00:00
size_t processed_bytes = file_progress.processed_bytes.load();
2021-04-26 13:34:44 +00:00
message << indicator << " Progress: ";
message << formatReadableQuantity(file_progress.processed_rows) << " rows, ";
message << formatReadableSizeWithDecimalSuffix(file_progress.processed_bytes) << " bytes. ";
size_t written_progress_chars = message.count() - prefix_size - (strlen(indicator) - 1); /// Don't count invisible output (escape sequences).
ssize_t width_of_progress_bar = static_cast<ssize_t>(terminal_width) - written_progress_chars - strlen(" 99%");
2021-04-26 22:38:56 +00:00
/// total_bytes_to_read is approximate, since its amount is taken as file size (or sum of all file sizes
/// from paths, generated for file table engine). And progress.read_bytes is counted accorging to columns types.
size_t total_bytes_corrected = std::max(processed_bytes, file_progress.total_bytes_to_process);
std::string bar = UnicodeBar::render(UnicodeBar::getWidth(processed_bytes, 0, total_bytes_corrected, width_of_progress_bar));
2021-04-26 13:34:44 +00:00
message << "\033[0;32m" << bar << "\033[0m";
if (width_of_progress_bar > static_cast<ssize_t>(bar.size() / UNICODE_BAR_CHAR_SIZE))
message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' ');
2021-04-26 22:38:56 +00:00
message << ' ' << std::min((99 * file_progress.processed_bytes / file_progress.total_bytes_to_process), static_cast<size_t>(99)) << '%';
2021-04-26 13:34:44 +00:00
}
++increment;
};
/// Progress callback can be added via context or via method in SourceWithProgress.
/// In executeQuery a callback from context is wrapped into another
/// progress callback and then passed to SourceWithProgress. Here another callback is
/// added to avoid overriding previous callbacks or avoid other callbacks overriding this one.
progress_callback = [callback, file_progress_callback](const Progress & progress)
{
callback(progress);
file_progress_callback(progress);
};
}
private:
std::shared_ptr<StorageFile> storage;
StorageMetadataPtr metadata_snapshot;
FilesInfoPtr files_info;
String current_path;
Block sample_block;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
2020-10-02 12:38:50 +00:00
ColumnsDescription columns_description;
ContextPtr context; /// TODO Untangle potential issues with context lifetime.
UInt64 max_block_size;
bool finished_generate = false;
std::shared_lock<std::shared_timed_mutex> shared_lock;
std::unique_lock<std::shared_timed_mutex> unique_lock;
};
2020-08-03 13:54:14 +00:00
Pipe StorageFile::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
2019-07-21 13:15:04 +00:00
BlockInputStreams blocks_input;
2019-09-06 18:29:41 +00:00
if (use_table_fd) /// need to call ctr BlockInputStream
paths = {""}; /// when use fd, paths are empty
else
if (paths.size() == 1 && !Poco::File(paths[0]).exists())
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
else
throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
}
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
files_info->files = paths;
for (const auto & column : column_names)
{
if (column == "_path")
files_info->need_path_column = true;
if (column == "_file")
files_info->need_file_column = true;
}
2020-01-04 14:45:11 +00:00
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
2020-01-04 14:45:11 +00:00
2020-02-14 10:22:05 +00:00
if (num_streams > paths.size())
num_streams = paths.size();
Pipes pipes;
pipes.reserve(num_streams);
2021-04-26 13:34:44 +00:00
/// For clickhouse-local add progress callback to display in a progress bar.
if (context->getApplicationType() == Context::ApplicationType::LOCAL)
context->setFileTableEngineApproxBytesToProcess(total_bytes_to_read);
for (size_t i = 0; i < num_streams; ++i)
{
2021-03-30 17:57:21 +00:00
const auto get_columns_for_format = [&]() -> ColumnsDescription
{
2021-03-31 14:21:19 +00:00
if (isColumnOriented())
return ColumnsDescription{
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()};
2021-03-30 17:57:21 +00:00
else
return metadata_snapshot->getColumns();
};
pipes.emplace_back(std::make_shared<StorageFileSource>(
2021-03-30 17:57:21 +00:00
this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format()));
}
2020-08-06 12:24:05 +00:00
return Pipe::unitePipes(std::move(pipes));
}
class StorageFileBlockOutputStream : public IBlockOutputStream
{
public:
explicit StorageFileBlockOutputStream(
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
2019-10-30 14:17:55 +00:00
const CompressionMethod compression_method,
ContextPtr context,
const std::optional<FormatSettings> & format_settings,
int & flags)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
2020-07-07 11:45:20 +00:00
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
if (storage.use_table_fd)
{
/** NOTE: Using real file binded to FD may be misleading:
* SELECT *; INSERT insert_data; SELECT *; last SELECT returns initil_fd_data + insert_data
* INSERT data; SELECT *; last SELECT returns only insert_data
*/
storage.table_fd_was_used = true;
naked_buffer = std::make_unique<WriteBufferFromFileDescriptor>(storage.table_fd, DBMS_DEFAULT_BUFFER_SIZE);
}
else
{
2019-09-06 18:29:41 +00:00
if (storage.paths.size() != 1)
throw Exception("Table '" + storage.getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
flags |= O_WRONLY | O_APPEND | O_CREAT;
naked_buffer = std::make_unique<WriteBufferFromFile>(storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags);
}
2020-07-07 11:45:20 +00:00
/// In case of CSVWithNames we have already written prefix.
if (naked_buffer->size())
prefix_written = true;
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
2020-12-30 03:07:30 +00:00
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(storage.format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context,
{}, format_settings);
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override
{
2020-07-07 11:45:20 +00:00
if (!prefix_written)
writer->writePrefix();
prefix_written = true;
}
void writeSuffix() override
{
writer->writeSuffix();
}
void flush() override
{
writer->flush();
}
private:
StorageFile & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
2020-07-07 11:45:20 +00:00
bool prefix_written{false};
};
BlockOutputStreamPtr StorageFile::write(
2017-12-01 21:13:25 +00:00
const ASTPtr & /*query*/,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context)
{
2020-01-04 14:45:11 +00:00
if (format_name == "Distributed")
throw Exception("Method write is not implemented for Distributed format", ErrorCodes::NOT_IMPLEMENTED);
int flags = 0;
std::string path;
if (context->getSettingsRef().engine_file_truncate_on_insert)
flags |= O_TRUNC;
if (!paths.empty())
{
path = paths[0];
Poco::File(Poco::Path(path).makeParent()).createDirectories();
}
return std::make_shared<StorageFileBlockOutputStream>(
*this,
metadata_snapshot,
std::unique_lock{rwlock, getLockTimeout(context)},
chooseCompressionMethod(path, compression_method),
context,
format_settings,
flags);
}
2020-11-01 17:38:43 +00:00
bool StorageFile::storesDataOnDisk() const
{
return is_db_table;
}
2019-09-06 08:53:32 +00:00
Strings StorageFile::getDataPaths() const
2019-09-04 19:55:56 +00:00
{
2019-09-05 18:09:19 +00:00
if (paths.empty())
2019-12-03 16:25:32 +00:00
throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
2019-09-06 08:53:32 +00:00
return paths;
2019-09-04 19:55:56 +00:00
}
2020-04-07 14:05:51 +00:00
void StorageFile::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
{
if (!is_db_table)
2019-12-03 16:25:32 +00:00
throw Exception("Can't rename table " + getStorageID().getNameForLogs() + " binded to user-defined file (or FD)", ErrorCodes::DATABASE_ACCESS_DENIED);
2019-09-04 11:11:30 +00:00
if (paths.size() != 1)
2019-12-03 16:25:32 +00:00
throw Exception("Can't rename table " + getStorageID().getNameForLogs() + " in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
2019-09-04 11:11:30 +00:00
std::string path_new = getTablePath(base_path + new_path_to_table_data, format_name);
if (path_new == paths[0])
return;
Poco::File(Poco::Path(path_new).parent()).createDirectories();
2019-09-04 11:11:30 +00:00
Poco::File(paths[0]).renameTo(path_new);
2019-09-04 11:11:30 +00:00
paths[0] = std::move(path_new);
2020-04-07 14:05:51 +00:00
renameInMemory(new_table_id);
}
2020-06-18 10:29:13 +00:00
void StorageFile::truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
2020-06-18 16:10:47 +00:00
TableExclusiveLockHolder &)
2020-01-05 02:57:09 +00:00
{
if (paths.size() != 1)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
2020-01-05 02:57:09 +00:00
if (use_table_fd)
{
if (0 != ::ftruncate(table_fd, 0))
throwFromErrno("Cannot truncate file at fd " + toString(table_fd), ErrorCodes::CANNOT_TRUNCATE_FILE);
}
else
{
2020-01-05 20:11:26 +00:00
if (!Poco::File(paths[0]).exists())
return;
2020-01-05 02:57:09 +00:00
if (0 != ::truncate(paths[0].c_str(), 0))
throwFromErrnoWithPath("Cannot truncate file " + paths[0], paths[0], ErrorCodes::CANNOT_TRUNCATE_FILE);
}
}
void registerStorageFile(StorageFactory & factory)
{
StorageFactory::StorageFeatures storage_features{
.supports_settings = true,
.source_access_type = AccessType::FILE
};
factory.registerStorage(
"File",
[](const StorageFactory::Arguments & factory_args)
{
StorageFile::CommonArguments storage_args
{
WithContext(factory_args.getContext()),
factory_args.table_id,
{},
{},
{},
factory_args.columns,
factory_args.constraints,
};
ASTs & engine_args_ast = factory_args.engine_args;
if (!(engine_args_ast.size() >= 1 && engine_args_ast.size() <= 3)) // NOLINT
throw Exception(
"Storage File requires from 1 to 3 arguments: name of used format, source and compression_method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args_ast[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[0], factory_args.getLocalContext());
storage_args.format_name = engine_args_ast[0]->as<ASTLiteral &>().value.safeGet<String>();
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
if (factory_args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
2019-10-30 14:17:55 +00:00
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = factory_args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(
factory_args.storage_def->settings->changes);
storage_args.format_settings = getFormatSettings(
factory_args.getContext(), user_format_settings);
}
else
{
storage_args.format_settings = getFormatSettings(
factory_args.getContext());
}
2019-10-30 14:17:55 +00:00
if (engine_args_ast.size() == 1) /// Table in database
return StorageFile::create(factory_args.relative_data_path, storage_args);
2019-10-30 14:17:55 +00:00
/// Will use FD if engine_args[1] is int literal or identifier with std* name
int source_fd = -1;
String source_path;
if (auto opt_name = tryGetIdentifierName(engine_args_ast[1]))
{
if (*opt_name == "stdin")
source_fd = STDIN_FILENO;
else if (*opt_name == "stdout")
source_fd = STDOUT_FILENO;
else if (*opt_name == "stderr")
source_fd = STDERR_FILENO;
else
throw Exception(
"Unknown identifier '" + *opt_name + "' in second arg of File storage constructor", ErrorCodes::UNKNOWN_IDENTIFIER);
}
else if (const auto * literal = engine_args_ast[1]->as<ASTLiteral>())
{
auto type = literal->value.getType();
if (type == Field::Types::Int64)
source_fd = static_cast<int>(literal->value.get<Int64>());
else if (type == Field::Types::UInt64)
source_fd = static_cast<int>(literal->value.get<UInt64>());
else if (type == Field::Types::String)
source_path = literal->value.get<String>();
else
throw Exception("Second argument must be path or file descriptor", ErrorCodes::BAD_ARGUMENTS);
}
if (engine_args_ast.size() == 3)
{
engine_args_ast[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args_ast[2], factory_args.getLocalContext());
storage_args.compression_method = engine_args_ast[2]->as<ASTLiteral &>().value.safeGet<String>();
}
2019-10-30 14:17:55 +00:00
else
storage_args.compression_method = "auto";
if (0 <= source_fd) /// File descriptor
return StorageFile::create(source_fd, storage_args);
else /// User's file
return StorageFile::create(source_path, factory_args.getContext()->getUserFilesPath(), storage_args);
},
storage_features);
}
NamesAndTypesList StorageFile::getVirtuals() const
2020-04-27 13:55:30 +00:00
{
return NamesAndTypesList{
2020-04-27 13:55:30 +00:00
{"_path", std::make_shared<DataTypeString>()},
{"_file", std::make_shared<DataTypeString>()}
};
}
2021-03-30 21:25:37 +00:00
}