Merge pull request #26777 from ClickHouse/bolonini-read-from-file

Merging #25960 (Bolonini/read_from_file)
This commit is contained in:
alexey-milovidov 2021-07-25 01:33:34 +03:00 committed by GitHub
commit 22fa1efacb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 146 additions and 41 deletions

View File

@ -9,6 +9,7 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
@ -24,6 +25,7 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
@ -46,7 +48,6 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_TRUNCATE_FILE;
extern const int DATABASE_ACCESS_DENIED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
@ -55,6 +56,7 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
extern const int TIMEOUT_EXCEEDED;
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_STAT;
}
namespace
@ -169,10 +171,6 @@ StorageFile::StorageFile(int table_fd_, CommonArguments args)
is_db_table = false;
use_table_fd = true;
table_fd = table_fd_;
/// Save initial offset, it will be used for repeating SELECTs
/// If FD isn't seekable (lseek returns -1), then the second and subsequent SELECTs will fail.
table_fd_init_offset = lseek(table_fd, 0, SEEK_CUR);
}
StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args)
@ -276,7 +274,8 @@ public:
const FilesInfoPtr & files_info)
{
if (storage->isColumnOriented())
return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID());
return metadata_snapshot->getSampleBlockForColumns(
columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID());
else
return getHeader(metadata_snapshot, files_info->need_path_column, files_info->need_file_column);
}
@ -296,28 +295,7 @@ public:
, context(context_)
, max_block_size(max_block_size_)
{
if (storage->use_table_fd)
{
unique_lock = std::unique_lock(storage->rwlock, getLockTimeout(context));
if (!unique_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
/// and add ability to seek unseekable files, but cache sync isn't supported.
if (storage->table_fd_was_used) /// We need seek to initial position
{
if (storage->table_fd_init_offset < 0)
throw Exception("File descriptor isn't seekable, inside " + storage->getName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
/// ReadBuffer's seek() doesn't make sense, since cache is empty
if (lseek(storage->table_fd, storage->table_fd_init_offset, SEEK_SET) < 0)
throwFromErrno("Cannot seek file descriptor, inside " + storage->getName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
storage->table_fd_was_used = true;
}
else
if (!storage->use_table_fd)
{
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context));
if (!shared_lock)
@ -358,14 +336,32 @@ public:
std::unique_ptr<ReadBuffer> nested_buffer;
CompressionMethod method;
struct stat file_stat{};
if (storage->use_table_fd)
{
nested_buffer = std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd);
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != fstat(storage->table_fd, &file_stat))
throwFromErrno("Cannot stat table file descriptor, inside " + storage->getName(), ErrorCodes::CANNOT_STAT);
if (S_ISREG(file_stat.st_mode))
nested_buffer = std::make_unique<ReadBufferFromFileDescriptorPRead>(storage->table_fd);
else
nested_buffer = std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd);
method = chooseCompressionMethod("", storage->compression_method);
}
else
{
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path, context->getSettingsRef().max_read_buffer_size);
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != stat(current_path.c_str(), &file_stat))
throwFromErrno("Cannot stat file " + current_path, ErrorCodes::CANNOT_STAT);
if (S_ISREG(file_stat.st_mode))
nested_buffer = std::make_unique<ReadBufferFromFilePRead>(current_path, context->getSettingsRef().max_read_buffer_size);
else
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path, context->getSettingsRef().max_read_buffer_size);
method = chooseCompressionMethod(current_path, storage->compression_method);
}
@ -459,9 +455,9 @@ private:
bool finished_generate = false;
std::shared_lock<std::shared_timed_mutex> shared_lock;
std::unique_lock<std::shared_timed_mutex> unique_lock;
};
Pipe StorageFile::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -476,6 +472,7 @@ Pipe StorageFile::read(
if (use_table_fd) /// need to call ctr BlockInputStream
paths = {""}; /// when use fd, paths are empty
else
{
if (paths.size() == 1 && !fs::exists(paths[0]))
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
@ -483,7 +480,7 @@ Pipe StorageFile::read(
else
throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
}
}
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
files_info->files = paths;
@ -519,6 +516,7 @@ Pipe StorageFile::read(
else
return metadata_snapshot->getColumns();
};
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format()));
}
@ -548,11 +546,6 @@ public:
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
if (storage.use_table_fd)
{
/** NOTE: Using real file bounded to FD may be misleading:
* SELECT *; INSERT insert_data; SELECT *; last SELECT returns initil_fd_data + insert_data
* INSERT data; SELECT *; last SELECT returns only insert_data
*/
storage.table_fd_was_used = true;
naked_buffer = std::make_unique<WriteBufferFromFileDescriptor>(storage.table_fd, DBMS_DEFAULT_BUFFER_SIZE);
}
else

View File

@ -95,10 +95,8 @@ private:
std::string base_path;
std::vector<std::string> paths;
bool is_db_table = true; /// Table is stored in real database, not user's file
bool use_table_fd = false; /// Use table_fd instead of path
std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin
off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads
bool is_db_table = true; /// Table is stored in real database, not user's file
bool use_table_fd = false; /// Use table_fd instead of path
mutable std::shared_timed_mutex rwlock;

View File

@ -0,0 +1,86 @@
File generated:
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
******************
Read twice from a regular file
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
******************
Read twice from file descriptor that corresponds to a regular file
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA

View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
SAMPLE_FILE=$(mktemp 01947_multiple_pipe_read_sample_data_XXXXXX.csv)
echo 'File generated:'
${CLICKHOUSE_LOCAL} -q "SELECT number AS x, if(number in (4,6), 'AAA', 'BBB') AS s from numbers(7)" > "$SAMPLE_FILE"
cat "$SAMPLE_FILE"
echo '******************'
echo 'Read twice from a regular file'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table; select * from table;' --file "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table WHERE x IN (select x from table);' --file "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table UNION ALL select * from table;' --file "$SAMPLE_FILE"
echo '******************'
echo 'Read twice from file descriptor that corresponds to a regular file'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table; select * from table;' < "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table WHERE x IN (select x from table);' < "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table UNION ALL select * from table;' < "$SAMPLE_FILE"
rm "$SAMPLE_FILE"