Merge pull request #6226 from yandex/aku/lock-tinylog

Lock the TinyLog storage when reading.
This commit is contained in:
alexey-milovidov 2019-07-31 17:40:55 +03:00 committed by GitHub
commit ac5d2a225d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 2 deletions

View File

@ -59,7 +59,8 @@ class TinyLogBlockInputStream final : public IBlockInputStream
public: public:
TinyLogBlockInputStream(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_) TinyLogBlockInputStream(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
: block_size(block_size_), columns(columns_), : block_size(block_size_), columns(columns_),
storage(storage_), max_read_buffer_size(max_read_buffer_size_) {} storage(storage_), lock(storage_.rwlock),
max_read_buffer_size(max_read_buffer_size_) {}
String getName() const override { return "TinyLog"; } String getName() const override { return "TinyLog"; }
@ -79,6 +80,7 @@ private:
size_t block_size; size_t block_size;
NamesAndTypesList columns; NamesAndTypesList columns;
StorageTinyLog & storage; StorageTinyLog & storage;
std::shared_lock<std::shared_mutex> lock;
bool finished = false; bool finished = false;
size_t max_read_buffer_size; size_t max_read_buffer_size;
@ -109,7 +111,7 @@ class TinyLogBlockOutputStream final : public IBlockOutputStream
{ {
public: public:
explicit TinyLogBlockOutputStream(StorageTinyLog & storage_) explicit TinyLogBlockOutputStream(StorageTinyLog & storage_)
: storage(storage_) : storage(storage_), lock(storage_.rwlock)
{ {
} }
@ -132,6 +134,7 @@ public:
private: private:
StorageTinyLog & storage; StorageTinyLog & storage;
std::unique_lock<std::shared_mutex> lock;
bool done = false; bool done = false;
struct Stream struct Stream
@ -373,6 +376,8 @@ void StorageTinyLog::addFiles(const String & column_name, const IDataType & type
void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{ {
std::unique_lock<std::shared_mutex> lock(rwlock);
/// Rename directory with data. /// Rename directory with data.
Poco::File(path + escapeForFileName(table_name)).renameTo(new_path_to_db + escapeForFileName(new_table_name)); Poco::File(path + escapeForFileName(table_name)).renameTo(new_path_to_db + escapeForFileName(new_table_name));
@ -395,6 +400,8 @@ BlockInputStreams StorageTinyLog::read(
const unsigned /*num_streams*/) const unsigned /*num_streams*/)
{ {
check(column_names); check(column_names);
// When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently.
return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>( return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
} }
@ -409,6 +416,7 @@ BlockOutputStreamPtr StorageTinyLog::write(
CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */) CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
{ {
std::shared_lock<std::shared_mutex> lock(rwlock);
return file_checker.check(); return file_checker.check();
} }
@ -417,6 +425,8 @@ void StorageTinyLog::truncate(const ASTPtr &, const Context &)
if (table_name.empty()) if (table_name.empty())
throw Exception("Logical error: table name is empty", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: table name is empty", ErrorCodes::LOGICAL_ERROR);
std::unique_lock<std::shared_mutex> lock(rwlock);
auto file = Poco::File(path + escapeForFileName(table_name)); auto file = Poco::File(path + escapeForFileName(table_name));
file.remove(true); file.remove(true);
file.createDirectories(); file.createDirectories();

View File

@ -65,6 +65,7 @@ private:
Files_t files; Files_t files;
FileChecker file_checker; FileChecker file_checker;
mutable std::shared_mutex rwlock;
Logger * log; Logger * log;