Continuation

This commit is contained in:
Alexey Milovidov 2020-09-18 00:55:53 +03:00
parent bc8fc3e280
commit 111acdc63b
9 changed files with 129 additions and 43 deletions

View File

@ -52,6 +52,7 @@ namespace ErrorCodes
extern const int UNKNOWN_IDENTIFIER;
extern const int INCORRECT_FILE_NAME;
extern const int FILE_DOESNT_EXIST;
extern const int TIMEOUT_EXCEEDED;
}
namespace
@ -199,6 +200,17 @@ StorageFile::StorageFile(CommonArguments args)
setInMemoryMetadata(storage_metadata);
}
static std::chrono::seconds getLockTimeout(const Context & context)
{
const Settings & settings = context.getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
class StorageFileSource : public SourceWithProgress
{
public:
@ -245,7 +257,9 @@ public:
{
if (storage->use_table_fd)
{
unique_lock = std::unique_lock(storage->rwlock);
unique_lock = std::unique_lock(storage->rwlock, getLockTimeout(context));
if (!unique_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
/// and add ability to seek unseekable files, but cache sync isn't supported.
@ -264,7 +278,9 @@ public:
}
else
{
shared_lock = std::shared_lock(storage->rwlock);
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context));
if (!shared_lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
@ -373,8 +389,8 @@ private:
bool finished_generate = false;
std::shared_lock<std::shared_mutex> shared_lock;
std::unique_lock<std::shared_mutex> unique_lock;
std::shared_lock<std::shared_timed_mutex> shared_lock;
std::unique_lock<std::shared_timed_mutex> unique_lock;
};
@ -417,7 +433,7 @@ Pipe StorageFile::read(
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
return Pipe::unitePipes(std::move(pipes));
}
@ -429,12 +445,16 @@ public:
explicit StorageFileBlockOutputStream(
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
std::unique_lock<std::shared_timed_mutex> && lock_,
const CompressionMethod compression_method,
const Context & context)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock)
, lock(std::move(lock_))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
if (storage.use_table_fd)
{
@ -488,7 +508,7 @@ public:
private:
StorageFile & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_mutex> lock;
std::unique_lock<std::shared_timed_mutex> lock;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
bool prefix_written{false};
@ -506,7 +526,7 @@ BlockOutputStreamPtr StorageFile::write(
if (!paths.empty())
path = paths[0];
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot,
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)},
chooseCompressionMethod(path, compression_method), context);
}
@ -529,8 +549,6 @@ void StorageFile::rename(const String & new_path_to_table_data, const StorageID
if (path_new == paths[0])
return;
std::unique_lock<std::shared_mutex> lock(rwlock);
Poco::File(Poco::Path(path_new).parent()).createDirectories();
Poco::File(paths[0]).renameTo(path_new);
@ -547,8 +565,6 @@ void StorageFile::truncate(
if (paths.size() != 1)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
std::unique_lock<std::shared_mutex> lock(rwlock);
if (use_table_fd)
{
if (0 != ::ftruncate(table_fd, 0))

View File

@ -89,7 +89,7 @@ private:
std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin
off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads
mutable std::shared_mutex rwlock;
mutable std::shared_timed_mutex rwlock;
Poco::Logger * log = &Poco::Logger::get("StorageFile");
};

View File

@ -39,6 +39,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
extern const int DUPLICATE_COLUMN;
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
@ -50,7 +51,6 @@ namespace ErrorCodes
class LogSource final : public SourceWithProgress
{
public:
static Block getHeader(const NamesAndTypesList & columns)
{
Block res;
@ -116,13 +116,16 @@ private:
class LogBlockOutputStream final : public IBlockOutputStream
{
public:
explicit LogBlockOutputStream(StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
explicit LogBlockOutputStream(
StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock)
, lock(std::move(lock_))
, marks_stream(
storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
}
~LogBlockOutputStream() override
@ -149,7 +152,7 @@ public:
private:
StorageLog & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_mutex> lock;
std::unique_lock<std::shared_timed_mutex> lock;
bool done = false;
struct Stream
@ -507,9 +510,11 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type)
}
void StorageLog::loadMarks()
void StorageLog::loadMarks(std::chrono::seconds lock_timeout)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
std::unique_lock<std::shared_timed_mutex> lock(rwlock, lock_timeout);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
if (loaded_marks)
return;
@ -606,6 +611,17 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
return it->second.marks;
}
static std::chrono::seconds getLockTimeout(const Context & context)
{
const Settings & settings = context.getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
Pipe StorageLog::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -616,11 +632,15 @@ Pipe StorageLog::read(
unsigned num_streams)
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
loadMarks();
auto lock_timeout = getLockTimeout(context);
loadMarks(lock_timeout);
NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names));
std::shared_lock<std::shared_mutex> lock(rwlock);
std::shared_lock<std::shared_timed_mutex> lock(rwlock, lock_timeout);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
Pipes pipes;
@ -649,18 +669,28 @@ Pipe StorageLog::read(
max_read_buffer_size));
}
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.
return Pipe::unitePipes(std::move(pipes));
}
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
loadMarks();
return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot);
auto lock_timeout = getLockTimeout(context);
loadMarks(lock_timeout);
std::unique_lock<std::shared_timed_mutex> lock(rwlock, lock_timeout);
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
}
CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & context)
{
std::shared_lock<std::shared_mutex> lock(rwlock);
std::shared_lock<std::shared_timed_mutex> lock(rwlock, getLockTimeout(context));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return file_checker.check();
}

View File

@ -83,7 +83,7 @@ private:
DiskPtr disk;
String table_path;
mutable std::shared_mutex rwlock;
mutable std::shared_timed_mutex rwlock;
Files files;
@ -104,7 +104,7 @@ private:
/// Read marks files if they are not already read.
/// It is done lazily, so that with a large number of tables, the server starts quickly.
/// You can not call with a write locked `rwlock`.
void loadMarks();
void loadMarks(std::chrono::seconds lock_timeout);
/** For normal columns, the number of rows in the block is specified in the marks.
* For array columns and nested structures, there are more than one group of marks that correspond to different files

View File

@ -47,13 +47,13 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
extern const int TIMEOUT_EXCEEDED;
}
class StripeLogSource final : public SourceWithProgress
{
public:
static Block getHeader(
StorageStripeLog & storage,
const StorageMetadataPtr & metadata_snapshot,
@ -157,10 +157,11 @@ private:
class StripeLogBlockOutputStream final : public IBlockOutputStream
{
public:
explicit StripeLogBlockOutputStream(StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
explicit StripeLogBlockOutputStream(
StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock)
, lock(std::move(lock_))
, data_out_file(storage.table_path + "data.bin")
, data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
, data_out(std::make_unique<CompressedWriteBuffer>(
@ -170,6 +171,8 @@ public:
, index_out(std::make_unique<CompressedWriteBuffer>(*index_out_compressed))
, block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
}
~StripeLogBlockOutputStream() override
@ -223,7 +226,7 @@ public:
private:
StorageStripeLog & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_mutex> lock;
std::unique_lock<std::shared_timed_mutex> lock;
String data_out_file;
std::unique_ptr<WriteBuffer> data_out_compressed;
@ -295,6 +298,16 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
}
static std::chrono::seconds getLockTimeout(const Context & context)
{
const Settings & settings = context.getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
Pipe StorageStripeLog::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -304,7 +317,9 @@ Pipe StorageStripeLog::read(
const size_t /*max_block_size*/,
unsigned num_streams)
{
std::shared_lock<std::shared_mutex> lock(rwlock);
std::shared_lock<std::shared_timed_mutex> lock(rwlock, getLockTimeout(context));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
@ -343,15 +358,22 @@ Pipe StorageStripeLog::read(
}
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot);
std::unique_lock<std::shared_timed_mutex> lock(rwlock, getLockTimeout(context));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
}
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & context)
{
std::shared_lock<std::shared_mutex> lock(rwlock);
std::shared_lock<std::shared_timed_mutex> lock(rwlock, getLockTimeout(context));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return file_checker.check();
}

View File

@ -67,7 +67,7 @@ private:
size_t max_compress_block_size;
FileChecker file_checker;
mutable std::shared_mutex rwlock;
mutable std::shared_timed_mutex rwlock;
Poco::Logger * log;
};

View File

@ -425,10 +425,6 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage
{
assert(table_path != new_path_to_table_data);
{
std::unique_lock<std::shared_timed_mutex> lock(rwlock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC));
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
disk->moveDirectory(table_path, new_path_to_table_data);
table_path = new_path_to_table_data;

View File

@ -1,10 +1,30 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t (x UInt8) ENGINE = TinyLog;
SET max_execution_time = 1;
SET max_execution_time = 1, lock_acquire_timeout = 1000;
INSERT INTO t SELECT * FROM t; -- { serverError 159 }
SET max_execution_time = 0, lock_acquire_timeout = 1;
INSERT INTO t SELECT * FROM t; -- { serverError 159 }
DROP TABLE t;
SET max_execution_time = 0, lock_acquire_timeout = 1000;
CREATE TABLE t (x UInt8) ENGINE = Log;
INSERT INTO t VALUES (1), (2), (3);
INSERT INTO t SELECT * FROM t;
SELECT count() FROM t;
DROP TABLE t;
CREATE TABLE t (x UInt8) ENGINE = StripeLog;
INSERT INTO t VALUES (1), (2), (3);
INSERT INTO t SELECT * FROM t;
SELECT count() FROM t;
DROP TABLE t;