Revert "Avoid deadlocks in Log/TinyLog"

This commit is contained in:
alexey-milovidov 2020-09-25 02:28:57 +03:00 committed by GitHub
parent 2566478349
commit 13529265c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 79 additions and 307 deletions

View File

@ -41,11 +41,6 @@ void FileChecker::setEmpty(const String & full_file_path)
map[fileName(full_file_path)] = 0; map[fileName(full_file_path)] = 0;
} }
FileChecker::Map FileChecker::getFileSizes() const
{
return map;
}
CheckResults FileChecker::check() const CheckResults FileChecker::check() const
{ {
// Read the files again every time you call `check` - so as not to violate the constancy. // Read the files again every time you call `check` - so as not to violate the constancy.

View File

@ -27,12 +27,10 @@ public:
/// The purpose of this function is to rollback a group of unfinished writes. /// The purpose of this function is to rollback a group of unfinished writes.
void repair(); void repair();
private:
/// File name -> size. /// File name -> size.
using Map = std::map<String, UInt64>; using Map = std::map<String, UInt64>;
Map getFileSizes() const;
private:
void initialize(); void initialize();
void updateImpl(const String & file_path); void updateImpl(const String & file_path);
void load(Map & local_map, const String & path) const; void load(Map & local_map, const String & path) const;

View File

@ -52,7 +52,6 @@ namespace ErrorCodes
extern const int UNKNOWN_IDENTIFIER; extern const int UNKNOWN_IDENTIFIER;
extern const int INCORRECT_FILE_NAME; extern const int INCORRECT_FILE_NAME;
extern const int FILE_DOESNT_EXIST; extern const int FILE_DOESNT_EXIST;
extern const int TIMEOUT_EXCEEDED;
} }
namespace namespace
@ -200,17 +199,6 @@ StorageFile::StorageFile(CommonArguments args)
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
} }
static std::chrono::seconds getLockTimeout(const Context & context)
{
const Settings & settings = context.getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
class StorageFileSource : public SourceWithProgress class StorageFileSource : public SourceWithProgress
{ {
public: public:
@ -257,9 +245,7 @@ public:
{ {
if (storage->use_table_fd) if (storage->use_table_fd)
{ {
unique_lock = std::unique_lock(storage->rwlock, getLockTimeout(context)); unique_lock = std::unique_lock(storage->rwlock);
if (!unique_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache /// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
/// and add ability to seek unseekable files, but cache sync isn't supported. /// and add ability to seek unseekable files, but cache sync isn't supported.
@ -278,9 +264,7 @@ public:
} }
else else
{ {
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context)); shared_lock = std::shared_lock(storage->rwlock);
if (!shared_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
} }
} }
@ -389,8 +373,8 @@ private:
bool finished_generate = false; bool finished_generate = false;
std::shared_lock<std::shared_timed_mutex> shared_lock; std::shared_lock<std::shared_mutex> shared_lock;
std::unique_lock<std::shared_timed_mutex> unique_lock; std::unique_lock<std::shared_mutex> unique_lock;
}; };
@ -433,7 +417,7 @@ Pipe StorageFile::read(
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<StorageFileSource>( pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults())); this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
} }
@ -445,16 +429,12 @@ public:
explicit StorageFileBlockOutputStream( explicit StorageFileBlockOutputStream(
StorageFile & storage_, StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
const CompressionMethod compression_method, const CompressionMethod compression_method,
const Context & context) const Context & context)
: storage(storage_) : storage(storage_)
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_)) , lock(storage.rwlock)
{ {
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr; std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
if (storage.use_table_fd) if (storage.use_table_fd)
{ {
@ -508,7 +488,7 @@ public:
private: private:
StorageFile & storage; StorageFile & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock; std::unique_lock<std::shared_mutex> lock;
std::unique_ptr<WriteBuffer> write_buf; std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer; BlockOutputStreamPtr writer;
bool prefix_written{false}; bool prefix_written{false};
@ -526,7 +506,7 @@ BlockOutputStreamPtr StorageFile::write(
if (!paths.empty()) if (!paths.empty())
path = paths[0]; path = paths[0];
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)}, return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot,
chooseCompressionMethod(path, compression_method), context); chooseCompressionMethod(path, compression_method), context);
} }
@ -549,6 +529,8 @@ void StorageFile::rename(const String & new_path_to_table_data, const StorageID
if (path_new == paths[0]) if (path_new == paths[0])
return; return;
std::unique_lock<std::shared_mutex> lock(rwlock);
Poco::File(Poco::Path(path_new).parent()).createDirectories(); Poco::File(Poco::Path(path_new).parent()).createDirectories();
Poco::File(paths[0]).renameTo(path_new); Poco::File(paths[0]).renameTo(path_new);
@ -565,6 +547,8 @@ void StorageFile::truncate(
if (paths.size() != 1) if (paths.size() != 1)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED); throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
std::unique_lock<std::shared_mutex> lock(rwlock);
if (use_table_fd) if (use_table_fd)
{ {
if (0 != ::ftruncate(table_fd, 0)) if (0 != ::ftruncate(table_fd, 0))

View File

@ -89,7 +89,7 @@ private:
std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin
off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads
mutable std::shared_timed_mutex rwlock; mutable std::shared_mutex rwlock;
Poco::Logger * log = &Poco::Logger::get("StorageFile"); Poco::Logger * log = &Poco::Logger::get("StorageFile");
}; };

View File

@ -39,7 +39,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int DUPLICATE_COLUMN; extern const int DUPLICATE_COLUMN;
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT; extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
@ -51,6 +50,7 @@ namespace ErrorCodes
class LogSource final : public SourceWithProgress class LogSource final : public SourceWithProgress
{ {
public: public:
static Block getHeader(const NamesAndTypesList & columns) static Block getHeader(const NamesAndTypesList & columns)
{ {
Block res; Block res;
@ -116,16 +116,13 @@ private:
class LogBlockOutputStream final : public IBlockOutputStream class LogBlockOutputStream final : public IBlockOutputStream
{ {
public: public:
explicit LogBlockOutputStream( explicit LogBlockOutputStream(StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_) : storage(storage_)
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_)) , lock(storage.rwlock)
, marks_stream( , marks_stream(
storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite)) storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
{ {
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
} }
~LogBlockOutputStream() override ~LogBlockOutputStream() override
@ -152,7 +149,7 @@ public:
private: private:
StorageLog & storage; StorageLog & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock; std::unique_lock<std::shared_mutex> lock;
bool done = false; bool done = false;
struct Stream struct Stream
@ -510,11 +507,9 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type)
} }
void StorageLog::loadMarks(std::chrono::seconds lock_timeout) void StorageLog::loadMarks()
{ {
std::unique_lock lock(rwlock, lock_timeout); std::unique_lock<std::shared_mutex> lock(rwlock);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
if (loaded_marks) if (loaded_marks)
return; return;
@ -557,6 +552,8 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID &
{ {
assert(table_path != new_path_to_table_data); assert(table_path != new_path_to_table_data);
{ {
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->moveDirectory(table_path, new_path_to_table_data); disk->moveDirectory(table_path, new_path_to_table_data);
table_path = new_path_to_table_data; table_path = new_path_to_table_data;
@ -572,6 +569,8 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID &
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
{ {
std::shared_lock<std::shared_mutex> lock(rwlock);
files.clear(); files.clear();
file_count = 0; file_count = 0;
loaded_marks = false; loaded_marks = false;
@ -611,17 +610,6 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
return it->second.marks; return it->second.marks;
} }
static std::chrono::seconds getLockTimeout(const Context & context)
{
const Settings & settings = context.getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
Pipe StorageLog::read( Pipe StorageLog::read(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
@ -632,15 +620,11 @@ Pipe StorageLog::read(
unsigned num_streams) unsigned num_streams)
{ {
metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
loadMarks();
auto lock_timeout = getLockTimeout(context);
loadMarks(lock_timeout);
NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)); NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names));
std::shared_lock lock(rwlock, lock_timeout); std::shared_lock<std::shared_mutex> lock(rwlock);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
Pipes pipes; Pipes pipes;
@ -669,28 +653,18 @@ Pipe StorageLog::read(
max_read_buffer_size)); max_read_buffer_size));
} }
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
} }
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{ {
auto lock_timeout = getLockTimeout(context); loadMarks();
loadMarks(lock_timeout); return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot);
std::unique_lock lock(rwlock, lock_timeout);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
} }
CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & context) CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
{ {
std::shared_lock lock(rwlock, getLockTimeout(context)); std::shared_lock<std::shared_mutex> lock(rwlock);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return file_checker.check(); return file_checker.check();
} }

View File

@ -83,7 +83,7 @@ private:
DiskPtr disk; DiskPtr disk;
String table_path; String table_path;
mutable std::shared_timed_mutex rwlock; mutable std::shared_mutex rwlock;
Files files; Files files;
@ -104,7 +104,7 @@ private:
/// Read marks files if they are not already read. /// Read marks files if they are not already read.
/// It is done lazily, so that with a large number of tables, the server starts quickly. /// It is done lazily, so that with a large number of tables, the server starts quickly.
/// You can not call with a write locked `rwlock`. /// You can not call with a write locked `rwlock`.
void loadMarks(std::chrono::seconds lock_timeout); void loadMarks();
/** For normal columns, the number of rows in the block is specified in the marks. /** For normal columns, the number of rows in the block is specified in the marks.
* For array columns and nested structures, there are more than one group of marks that correspond to different files * For array columns and nested structures, there are more than one group of marks that correspond to different files

View File

@ -47,13 +47,13 @@ namespace ErrorCodes
{ {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME; extern const int INCORRECT_FILE_NAME;
extern const int TIMEOUT_EXCEEDED;
} }
class StripeLogSource final : public SourceWithProgress class StripeLogSource final : public SourceWithProgress
{ {
public: public:
static Block getHeader( static Block getHeader(
StorageStripeLog & storage, StorageStripeLog & storage,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
@ -157,11 +157,10 @@ private:
class StripeLogBlockOutputStream final : public IBlockOutputStream class StripeLogBlockOutputStream final : public IBlockOutputStream
{ {
public: public:
explicit StripeLogBlockOutputStream( explicit StripeLogBlockOutputStream(StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_) : storage(storage_)
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_)) , lock(storage.rwlock)
, data_out_file(storage.table_path + "data.bin") , data_out_file(storage.table_path + "data.bin")
, data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append)) , data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
, data_out(std::make_unique<CompressedWriteBuffer>( , data_out(std::make_unique<CompressedWriteBuffer>(
@ -171,8 +170,6 @@ public:
, index_out(std::make_unique<CompressedWriteBuffer>(*index_out_compressed)) , index_out(std::make_unique<CompressedWriteBuffer>(*index_out_compressed))
, block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file)) , block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file))
{ {
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
} }
~StripeLogBlockOutputStream() override ~StripeLogBlockOutputStream() override
@ -226,7 +223,7 @@ public:
private: private:
StorageStripeLog & storage; StorageStripeLog & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock; std::unique_lock<std::shared_mutex> lock;
String data_out_file; String data_out_file;
std::unique_ptr<WriteBuffer> data_out_compressed; std::unique_ptr<WriteBuffer> data_out_compressed;
@ -289,6 +286,8 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
{ {
assert(table_path != new_path_to_table_data); assert(table_path != new_path_to_table_data);
{ {
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->moveDirectory(table_path, new_path_to_table_data); disk->moveDirectory(table_path, new_path_to_table_data);
table_path = new_path_to_table_data; table_path = new_path_to_table_data;
@ -298,16 +297,6 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
} }
static std::chrono::seconds getLockTimeout(const Context & context)
{
const Settings & settings = context.getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
Pipe StorageStripeLog::read( Pipe StorageStripeLog::read(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
@ -317,9 +306,7 @@ Pipe StorageStripeLog::read(
const size_t /*max_block_size*/, const size_t /*max_block_size*/,
unsigned num_streams) unsigned num_streams)
{ {
std::shared_lock lock(rwlock, getLockTimeout(context)); std::shared_lock<std::shared_mutex> lock(rwlock);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
@ -358,28 +345,24 @@ Pipe StorageStripeLog::read(
} }
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{ {
std::unique_lock lock(rwlock, getLockTimeout(context)); return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
} }
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & context) CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
{ {
std::shared_lock lock(rwlock, getLockTimeout(context)); std::shared_lock<std::shared_mutex> lock(rwlock);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return file_checker.check(); return file_checker.check();
} }
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{ {
std::shared_lock<std::shared_mutex> lock(rwlock);
disk->clearDirectory(table_path); disk->clearDirectory(table_path);
file_checker = FileChecker{disk, table_path + "sizes.json"}; file_checker = FileChecker{disk, table_path + "sizes.json"};
} }

View File

@ -67,7 +67,7 @@ private:
size_t max_compress_block_size; size_t max_compress_block_size;
FileChecker file_checker; FileChecker file_checker;
mutable std::shared_timed_mutex rwlock; mutable std::shared_mutex rwlock;
Poco::Logger * log; Poco::Logger * log;
}; };

View File

@ -13,7 +13,6 @@
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h> #include <IO/WriteBufferFromFileBase.h>
#include <IO/LimitReadBuffer.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
@ -47,7 +46,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int TIMEOUT_EXCEEDED;
extern const int DUPLICATE_COLUMN; extern const int DUPLICATE_COLUMN;
extern const int INCORRECT_FILE_NAME; extern const int INCORRECT_FILE_NAME;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
@ -57,6 +55,7 @@ namespace ErrorCodes
class TinyLogSource final : public SourceWithProgress class TinyLogSource final : public SourceWithProgress
{ {
public: public:
static Block getHeader(const NamesAndTypesList & columns) static Block getHeader(const NamesAndTypesList & columns)
{ {
Block res; Block res;
@ -67,17 +66,10 @@ public:
return Nested::flatten(res); return Nested::flatten(res);
} }
TinyLogSource( TinyLogSource(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
size_t block_size_,
const NamesAndTypesList & columns_,
StorageTinyLog & storage_,
size_t max_read_buffer_size_,
FileChecker::Map file_sizes_)
: SourceWithProgress(getHeader(columns_)) : SourceWithProgress(getHeader(columns_))
, block_size(block_size_), columns(columns_), storage(storage_) , block_size(block_size_), columns(columns_), storage(storage_), lock(storage_.rwlock)
, max_read_buffer_size(max_read_buffer_size_), file_sizes(std::move(file_sizes_)) , max_read_buffer_size(max_read_buffer_size_) {}
{
}
String getName() const override { return "TinyLog"; } String getName() const override { return "TinyLog"; }
@ -88,21 +80,19 @@ private:
size_t block_size; size_t block_size;
NamesAndTypesList columns; NamesAndTypesList columns;
StorageTinyLog & storage; StorageTinyLog & storage;
std::shared_lock<std::shared_mutex> lock;
bool is_finished = false; bool is_finished = false;
size_t max_read_buffer_size; size_t max_read_buffer_size;
FileChecker::Map file_sizes;
struct Stream struct Stream
{ {
Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_, size_t file_size) Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_)
: plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))), : plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))),
limited(std::make_unique<LimitReadBuffer>(*plain, file_size, false)),
compressed(*plain) compressed(*plain)
{ {
} }
std::unique_ptr<ReadBuffer> plain; std::unique_ptr<ReadBuffer> plain;
std::unique_ptr<ReadBuffer> limited;
CompressedReadBuffer compressed; CompressedReadBuffer compressed;
}; };
@ -120,14 +110,9 @@ private:
class TinyLogBlockOutputStream final : public IBlockOutputStream class TinyLogBlockOutputStream final : public IBlockOutputStream
{ {
public: public:
explicit TinyLogBlockOutputStream( explicit TinyLogBlockOutputStream(StorageTinyLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
StorageTinyLog & storage_, : storage(storage_), metadata_snapshot(metadata_snapshot_), lock(storage_.rwlock)
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_))
{ {
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
} }
~TinyLogBlockOutputStream() override ~TinyLogBlockOutputStream() override
@ -155,7 +140,7 @@ public:
private: private:
StorageTinyLog & storage; StorageTinyLog & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock; std::unique_lock<std::shared_mutex> lock;
bool done = false; bool done = false;
struct Stream struct Stream
@ -246,17 +231,13 @@ void TinyLogSource::readData(const String & name, const IDataType & type, IColum
String stream_name = IDataType::getFileNameForStream(name, path); String stream_name = IDataType::getFileNameForStream(name, path);
if (!streams.count(stream_name)) if (!streams.count(stream_name))
{ streams[stream_name] = std::make_unique<Stream>(storage.disk, storage.files[stream_name].data_file_path, max_read_buffer_size);
String file_path = storage.files[stream_name].data_file_path;
streams[stream_name] = std::make_unique<Stream>(
storage.disk, file_path, max_read_buffer_size, file_sizes[fileName(file_path)]);
}
return &streams[stream_name]->compressed; return &streams[stream_name]->compressed;
}; };
if (deserialize_states.count(name) == 0) if (deserialize_states.count(name) == 0)
type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]); type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]);
} }
@ -429,6 +410,8 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage
{ {
assert(table_path != new_path_to_table_data); assert(table_path != new_path_to_table_data);
{ {
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->moveDirectory(table_path, new_path_to_table_data); disk->moveDirectory(table_path, new_path_to_table_data);
table_path = new_path_to_table_data; table_path = new_path_to_table_data;
@ -441,16 +424,6 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage
} }
static std::chrono::seconds getLockTimeout(const Context & context)
{
const Settings & settings = context.getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
Pipe StorageTinyLog::read( Pipe StorageTinyLog::read(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
@ -464,40 +437,28 @@ Pipe StorageTinyLog::read(
// When reading, we lock the entire storage, because we only have one file // When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently. // per column and can't modify it concurrently.
const Settings & settings = context.getSettingsRef();
std::shared_lock lock{rwlock, getLockTimeout(context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.
return Pipe(std::make_shared<TinyLogSource>( return Pipe(std::make_shared<TinyLogSource>(
max_block_size, max_block_size, Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)),
*this,
settings.max_read_buffer_size,
file_checker.getFileSizes()));
} }
BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context) BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{ {
return std::make_shared<TinyLogBlockOutputStream>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)}); return std::make_shared<TinyLogBlockOutputStream>(*this, metadata_snapshot);
} }
CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & context) CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
{ {
std::shared_lock lock(rwlock, getLockTimeout(context)); std::shared_lock<std::shared_mutex> lock(rwlock);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return file_checker.check(); return file_checker.check();
} }
void StorageTinyLog::truncate( void StorageTinyLog::truncate(
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
{ {
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->clearDirectory(table_path); disk->clearDirectory(table_path);
files.clear(); files.clear();
@ -507,6 +468,14 @@ void StorageTinyLog::truncate(
addFiles(column.name, *column.type); addFiles(column.name, *column.type);
} }
void StorageTinyLog::drop()
{
std::unique_lock<std::shared_mutex> lock(rwlock);
if (disk->exists(table_path))
disk->removeRecursive(table_path);
files.clear();
}
void registerStorageTinyLog(StorageFactory & factory) void registerStorageTinyLog(StorageFactory & factory)
{ {

View File

@ -43,6 +43,8 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override; void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
void drop() override;
protected: protected:
StorageTinyLog( StorageTinyLog(
DiskPtr disk_, DiskPtr disk_,
@ -68,7 +70,7 @@ private:
Files files; Files files;
FileChecker file_checker; FileChecker file_checker;
mutable std::shared_timed_mutex rwlock; mutable std::shared_mutex rwlock;
Poco::Logger * log; Poco::Logger * log;

View File

@ -1,3 +0,0 @@
6
6
6

View File

@ -1,26 +0,0 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t (x UInt8) ENGINE = TinyLog;
INSERT INTO t VALUES (1), (2), (3);
INSERT INTO t SELECT * FROM t;
SELECT count() FROM t;
DROP TABLE t;
CREATE TABLE t (x UInt8) ENGINE = Log;
INSERT INTO t VALUES (1), (2), (3);
INSERT INTO t SELECT * FROM t;
SELECT count() FROM t;
DROP TABLE t;
CREATE TABLE t (x UInt8) ENGINE = StripeLog;
INSERT INTO t VALUES (1), (2), (3);
INSERT INTO t SELECT * FROM t;
SELECT count() FROM t;
DROP TABLE t;

View File

@ -1,6 +0,0 @@
Testing TinyLog
Done TinyLog
Testing StripeLog
Done StripeLog
Testing Log
Done Log

View File

@ -1,85 +0,0 @@
#!/usr/bin/env bash
set -e
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=fatal
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
function thread_create {
while true; do
$CLICKHOUSE_CLIENT --query "CREATE TABLE IF NOT EXISTS $1 (x UInt64, s Array(Nullable(String))) ENGINE = $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)'
sleep 0.0$RANDOM
done
}
function thread_drop {
while true; do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS $1" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)'
sleep 0.0$RANDOM
done
}
function thread_rename {
while true; do
$CLICKHOUSE_CLIENT --query "RENAME TABLE $1 TO $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)'
sleep 0.0$RANDOM
done
}
function thread_select {
while true; do
$CLICKHOUSE_CLIENT --query "SELECT * FROM $1 FORMAT Null" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)'
sleep 0.0$RANDOM
done
}
function thread_insert {
while true; do
$CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT rand64(1), [toString(rand64(2))] FROM numbers($2)" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)'
sleep 0.0$RANDOM
done
}
function thread_insert_select {
while true; do
$CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT * FROM $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)'
sleep 0.0$RANDOM
done
}
export -f thread_create
export -f thread_drop
export -f thread_rename
export -f thread_select
export -f thread_insert
export -f thread_insert_select
# Do randomized queries and expect nothing extraordinary happens.
function test_with_engine {
echo "Testing $1"
timeout 10 bash -c "thread_create t1 $1" &
timeout 10 bash -c "thread_create t2 $1" &
timeout 10 bash -c 'thread_drop t1' &
timeout 10 bash -c 'thread_drop t2' &
timeout 10 bash -c 'thread_rename t1 t2' &
timeout 10 bash -c 'thread_rename t2 t1' &
timeout 10 bash -c 'thread_select t1' &
timeout 10 bash -c 'thread_select t2' &
timeout 10 bash -c 'thread_insert t1 5' &
timeout 10 bash -c 'thread_insert t2 10' &
timeout 10 bash -c 'thread_insert_select t1 t2' &
timeout 10 bash -c 'thread_insert_select t2 t1' &
wait
echo "Done $1"
}
test_with_engine TinyLog
test_with_engine StripeLog
test_with_engine Log

View File

@ -1,12 +0,0 @@
DROP TABLE IF EXISTS t_local;
DROP TABLE IF EXISTS t_dist;
create table t_local(a int) engine Log;
create table t_dist (a int) engine Distributed(test_shard_localhost, currentDatabase(), 't_local', cityHash64(a));
set insert_distributed_sync = 1;
insert into t_dist values (1);
DROP TABLE t_local;
DROP TABLE t_dist;

View File

@ -145,4 +145,3 @@
01461_query_start_time_microseconds 01461_query_start_time_microseconds
01455_shard_leaf_max_rows_bytes_to_read 01455_shard_leaf_max_rows_bytes_to_read
01505_distributed_local_type_conversion_enum 01505_distributed_local_type_conversion_enum
01505_log_distributed_deadlock