Allow to read file descriptor multiple times in File storage

This commit is contained in:
Alexey Milovidov 2021-07-24 19:50:03 +03:00
parent 456801ccb8
commit 774f6d2617
4 changed files with 126 additions and 78 deletions

View File

@ -173,10 +173,6 @@ StorageFile::StorageFile(int table_fd_, CommonArguments args)
is_db_table = false;
use_table_fd = true;
table_fd = table_fd_;
/// Save initial offset, it will be used for repeating SELECTs
/// If FD isn't seekable (lseek returns -1), then the second and subsequent SELECTs will fail.
table_fd_init_offset = lseek(table_fd, 0, SEEK_CUR);
}
StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args)
@ -280,7 +276,8 @@ public:
const FilesInfoPtr & files_info)
{
if (storage->isColumnOriented())
return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID());
return metadata_snapshot->getSampleBlockForColumns(
columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID());
else
return getHeader(metadata_snapshot, files_info->need_path_column, files_info->need_file_column);
}
@ -300,11 +297,7 @@ public:
, context(context_)
, max_block_size(max_block_size_)
{
if (storage->use_table_fd)
{
storage->table_fd_was_used = true;
}
else
if (!storage->use_table_fd)
{
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context));
if (!shared_lock)
@ -345,14 +338,32 @@ public:
std::unique_ptr<ReadBuffer> nested_buffer;
CompressionMethod method;
struct stat file_stat{};
if (storage->use_table_fd)
{
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != fstat(storage->table_fd, &file_stat))
throwFromErrno("Cannot stat table file descriptor, inside " + storage->getName(), ErrorCodes::CANNOT_STAT);
if (S_ISREG(file_stat.st_mode))
nested_buffer = std::make_unique<ReadBufferFromFileDescriptorPRead>(storage->table_fd);
else
nested_buffer = std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd);
method = chooseCompressionMethod("", storage->compression_method);
}
else
{
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != stat(current_path.c_str(), &file_stat))
throwFromErrno("Cannot stat file " + current_path, ErrorCodes::CANNOT_STAT);
if (S_ISREG(file_stat.st_mode))
nested_buffer = std::make_unique<ReadBufferFromFilePRead>(current_path, context->getSettingsRef().max_read_buffer_size);
else
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path, context->getSettingsRef().max_read_buffer_size);
method = chooseCompressionMethod(current_path, storage->compression_method);
}
@ -446,9 +457,9 @@ private:
bool finished_generate = false;
std::shared_lock<std::shared_timed_mutex> shared_lock;
std::unique_lock<std::shared_timed_mutex> unique_lock;
};
Pipe StorageFile::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -463,6 +474,7 @@ Pipe StorageFile::read(
if (use_table_fd) /// need to call ctr BlockInputStream
paths = {""}; /// when use fd, paths are empty
else
{
if (paths.size() == 1 && !fs::exists(paths[0]))
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
@ -470,7 +482,7 @@ Pipe StorageFile::read(
else
throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
}
}
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
files_info->files = paths;
@ -484,7 +496,6 @@ Pipe StorageFile::read(
}
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
ReadBufferFromFileDescriptorPRead table_fd_seek(this_ptr->table_fd);
if (num_streams > paths.size())
num_streams = paths.size();
@ -507,27 +518,7 @@ Pipe StorageFile::read(
else
return metadata_snapshot->getColumns();
};
// if we do multiple reads from pipe, we want to check if pipe is a regular file or a pipe
if (this_ptr->table_fd_was_used)
{
struct stat fd_stat;
if (fstat(this_ptr->table_fd, &fd_stat) == -1)
{
throw Exception("Cannot stat table file descriptor, inside " + this_ptr->getName(), ErrorCodes::CANNOT_STAT);
}
if (S_ISREG(fd_stat.st_mode))
{
if (this_ptr->table_fd_init_offset < 0)
{
throw Exception("File descriptor isn't seekable, inside " + this_ptr->getName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
}
else
{
// else if fd is not a regular file, then throw an Exception
throw Exception("Cannot read from a pipe twice", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
}
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format()));
}
@ -557,11 +548,6 @@ public:
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
if (storage.use_table_fd)
{
/** NOTE: Using real file bounded to FD may be misleading:
* SELECT *; INSERT insert_data; SELECT *; last SELECT returns initil_fd_data + insert_data
* INSERT data; SELECT *; last SELECT returns only insert_data
*/
storage.table_fd_was_used = true;
naked_buffer = std::make_unique<WriteBufferFromFileDescriptor>(storage.table_fd, DBMS_DEFAULT_BUFFER_SIZE);
}
else

View File

@ -97,8 +97,6 @@ private:
bool is_db_table = true; /// Table is stored in real database, not user's file
bool use_table_fd = false; /// Use table_fd instead of path
std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin
off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads
mutable std::shared_timed_mutex rwlock;

View File

@ -1,27 +1,86 @@
File generated:
0,"BBB"
1,"BBB"
2,"BBB"
3,"BBB"
4,"AAA"
5,"BBB"
6,"AAA"
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
******************
Attempt to read twice from a regular file
0,"BBB"
1,"BBB"
2,"BBB"
3,"BBB"
4,"AAA"
5,"BBB"
6,"AAA"
0,"BBB"
1,"BBB"
2,"BBB"
3,"BBB"
4,"AAA"
5,"BBB"
6,"AAA"
Read twice from a regular file
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
******************
Attempt to read twice from a pipe
OK: stderr contains a message 'Cannot read from a pipe twice'
Read twice from file descriptor that corresponds to a regular file
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
---
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA
0 BBB
1 BBB
2 BBB
3 BBB
4 AAA
5 BBB
6 AAA

View File

@ -4,20 +4,25 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
SAMPLE_FILE=$(mktemp 01947_multiple_pipe_read_sample_data_XXXXXX.csv)
STD_ERROR_CAPTURED=$(mktemp 01947_multiple_pipe_read_std_error_captured_XXXXXX.log)
echo 'File generated:'
${CLICKHOUSE_LOCAL} -q "SELECT number, if(number in (4,6), 'AAA', 'BBB') from numbers(7) FORMAT CSV" --format_csv_delimiter=, >"$SAMPLE_FILE"
${CLICKHOUSE_LOCAL} -q "SELECT number AS x, if(number in (4,6), 'AAA', 'BBB') AS s from numbers(7)" > "$SAMPLE_FILE"
cat "$SAMPLE_FILE"
echo '******************'
echo 'Attempt to read twice from a regular file'
${CLICKHOUSE_LOCAL} --structure 'key String' -q 'select * from table; select * from table;' --file "$SAMPLE_FILE"
echo 'Read twice from a regular file'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table; select * from table;' --file "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table WHERE x IN (select x from table);' --file "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table UNION ALL select * from table;' --file "$SAMPLE_FILE"
echo '******************'
echo 'Attempt to read twice from a pipe'
echo 1 | ${CLICKHOUSE_LOCAL} --structure "a int" --query "select a from table where a in (select a from table)" 2>"$STD_ERROR_CAPTURED"
expected_error_message='Cannot read from a pipe twice'
cat "$STD_ERROR_CAPTURED" | grep -q "$expected_error_message" && echo "OK: stderr contains a message '$expected_error_message'" || echo "FAILED: Error message is wrong"
echo 'Read twice from file descriptor that corresponds to a regular file'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table; select * from table;' < "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table WHERE x IN (select x from table);' < "$SAMPLE_FILE"
echo '---'
${CLICKHOUSE_LOCAL} --structure 'x UInt64, s String' -q 'select * from table UNION ALL select * from table;' < "$SAMPLE_FILE"
rm "$SAMPLE_FILE" "$STD_ERROR_CAPTURED"
rm "$SAMPLE_FILE"