mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Revert "Revert "Avoid deadlocks in Log/TinyLog""
This commit is contained in:
parent
89bbafbd90
commit
47d150b116
@ -41,6 +41,11 @@ void FileChecker::setEmpty(const String & full_file_path)
|
|||||||
map[fileName(full_file_path)] = 0;
|
map[fileName(full_file_path)] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FileChecker::Map FileChecker::getFileSizes() const
|
||||||
|
{
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
CheckResults FileChecker::check() const
|
CheckResults FileChecker::check() const
|
||||||
{
|
{
|
||||||
// Read the files again every time you call `check` - so as not to violate the constancy.
|
// Read the files again every time you call `check` - so as not to violate the constancy.
|
||||||
|
@ -27,10 +27,12 @@ public:
|
|||||||
/// The purpose of this function is to rollback a group of unfinished writes.
|
/// The purpose of this function is to rollback a group of unfinished writes.
|
||||||
void repair();
|
void repair();
|
||||||
|
|
||||||
private:
|
|
||||||
/// File name -> size.
|
/// File name -> size.
|
||||||
using Map = std::map<String, UInt64>;
|
using Map = std::map<String, UInt64>;
|
||||||
|
|
||||||
|
Map getFileSizes() const;
|
||||||
|
|
||||||
|
private:
|
||||||
void initialize();
|
void initialize();
|
||||||
void updateImpl(const String & file_path);
|
void updateImpl(const String & file_path);
|
||||||
void load(Map & local_map, const String & path) const;
|
void load(Map & local_map, const String & path) const;
|
||||||
|
@ -52,6 +52,7 @@ namespace ErrorCodes
|
|||||||
extern const int UNKNOWN_IDENTIFIER;
|
extern const int UNKNOWN_IDENTIFIER;
|
||||||
extern const int INCORRECT_FILE_NAME;
|
extern const int INCORRECT_FILE_NAME;
|
||||||
extern const int FILE_DOESNT_EXIST;
|
extern const int FILE_DOESNT_EXIST;
|
||||||
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
@ -199,6 +200,17 @@ StorageFile::StorageFile(CommonArguments args)
|
|||||||
setInMemoryMetadata(storage_metadata);
|
setInMemoryMetadata(storage_metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static std::chrono::seconds getLockTimeout(const Context & context)
|
||||||
|
{
|
||||||
|
const Settings & settings = context.getSettingsRef();
|
||||||
|
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
|
||||||
|
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
|
||||||
|
lock_timeout = settings.max_execution_time.totalSeconds();
|
||||||
|
return std::chrono::seconds{lock_timeout};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class StorageFileSource : public SourceWithProgress
|
class StorageFileSource : public SourceWithProgress
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -245,7 +257,9 @@ public:
|
|||||||
{
|
{
|
||||||
if (storage->use_table_fd)
|
if (storage->use_table_fd)
|
||||||
{
|
{
|
||||||
unique_lock = std::unique_lock(storage->rwlock);
|
unique_lock = std::unique_lock(storage->rwlock, getLockTimeout(context));
|
||||||
|
if (!unique_lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
|
/// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
|
||||||
/// and add ability to seek unseekable files, but cache sync isn't supported.
|
/// and add ability to seek unseekable files, but cache sync isn't supported.
|
||||||
@ -264,7 +278,9 @@ public:
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
shared_lock = std::shared_lock(storage->rwlock);
|
shared_lock = std::shared_lock(storage->rwlock, getLockTimeout(context));
|
||||||
|
if (!shared_lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -373,8 +389,8 @@ private:
|
|||||||
|
|
||||||
bool finished_generate = false;
|
bool finished_generate = false;
|
||||||
|
|
||||||
std::shared_lock<std::shared_mutex> shared_lock;
|
std::shared_lock<std::shared_timed_mutex> shared_lock;
|
||||||
std::unique_lock<std::shared_mutex> unique_lock;
|
std::unique_lock<std::shared_timed_mutex> unique_lock;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -417,7 +433,7 @@ Pipe StorageFile::read(
|
|||||||
|
|
||||||
for (size_t i = 0; i < num_streams; ++i)
|
for (size_t i = 0; i < num_streams; ++i)
|
||||||
pipes.emplace_back(std::make_shared<StorageFileSource>(
|
pipes.emplace_back(std::make_shared<StorageFileSource>(
|
||||||
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
|
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
|
||||||
|
|
||||||
return Pipe::unitePipes(std::move(pipes));
|
return Pipe::unitePipes(std::move(pipes));
|
||||||
}
|
}
|
||||||
@ -429,12 +445,16 @@ public:
|
|||||||
explicit StorageFileBlockOutputStream(
|
explicit StorageFileBlockOutputStream(
|
||||||
StorageFile & storage_,
|
StorageFile & storage_,
|
||||||
const StorageMetadataPtr & metadata_snapshot_,
|
const StorageMetadataPtr & metadata_snapshot_,
|
||||||
|
std::unique_lock<std::shared_timed_mutex> && lock_,
|
||||||
const CompressionMethod compression_method,
|
const CompressionMethod compression_method,
|
||||||
const Context & context)
|
const Context & context)
|
||||||
: storage(storage_)
|
: storage(storage_)
|
||||||
, metadata_snapshot(metadata_snapshot_)
|
, metadata_snapshot(metadata_snapshot_)
|
||||||
, lock(storage.rwlock)
|
, lock(std::move(lock_))
|
||||||
{
|
{
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
|
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer = nullptr;
|
||||||
if (storage.use_table_fd)
|
if (storage.use_table_fd)
|
||||||
{
|
{
|
||||||
@ -488,7 +508,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
StorageFile & storage;
|
StorageFile & storage;
|
||||||
StorageMetadataPtr metadata_snapshot;
|
StorageMetadataPtr metadata_snapshot;
|
||||||
std::unique_lock<std::shared_mutex> lock;
|
std::unique_lock<std::shared_timed_mutex> lock;
|
||||||
std::unique_ptr<WriteBuffer> write_buf;
|
std::unique_ptr<WriteBuffer> write_buf;
|
||||||
BlockOutputStreamPtr writer;
|
BlockOutputStreamPtr writer;
|
||||||
bool prefix_written{false};
|
bool prefix_written{false};
|
||||||
@ -506,7 +526,7 @@ BlockOutputStreamPtr StorageFile::write(
|
|||||||
if (!paths.empty())
|
if (!paths.empty())
|
||||||
path = paths[0];
|
path = paths[0];
|
||||||
|
|
||||||
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot,
|
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)},
|
||||||
chooseCompressionMethod(path, compression_method), context);
|
chooseCompressionMethod(path, compression_method), context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -529,8 +549,6 @@ void StorageFile::rename(const String & new_path_to_table_data, const StorageID
|
|||||||
if (path_new == paths[0])
|
if (path_new == paths[0])
|
||||||
return;
|
return;
|
||||||
|
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
Poco::File(Poco::Path(path_new).parent()).createDirectories();
|
Poco::File(Poco::Path(path_new).parent()).createDirectories();
|
||||||
Poco::File(paths[0]).renameTo(path_new);
|
Poco::File(paths[0]).renameTo(path_new);
|
||||||
|
|
||||||
@ -547,8 +565,6 @@ void StorageFile::truncate(
|
|||||||
if (paths.size() != 1)
|
if (paths.size() != 1)
|
||||||
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
|
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
|
||||||
|
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
if (use_table_fd)
|
if (use_table_fd)
|
||||||
{
|
{
|
||||||
if (0 != ::ftruncate(table_fd, 0))
|
if (0 != ::ftruncate(table_fd, 0))
|
||||||
|
@ -89,7 +89,7 @@ private:
|
|||||||
std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin
|
std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin
|
||||||
off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads
|
off_t table_fd_init_offset = -1; /// Initial position of fd, used for repeating reads
|
||||||
|
|
||||||
mutable std::shared_mutex rwlock;
|
mutable std::shared_timed_mutex rwlock;
|
||||||
|
|
||||||
Poco::Logger * log = &Poco::Logger::get("StorageFile");
|
Poco::Logger * log = &Poco::Logger::get("StorageFile");
|
||||||
};
|
};
|
||||||
|
@ -39,6 +39,7 @@ namespace DB
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
extern const int DUPLICATE_COLUMN;
|
extern const int DUPLICATE_COLUMN;
|
||||||
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
|
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
|
||||||
@ -50,7 +51,6 @@ namespace ErrorCodes
|
|||||||
class LogSource final : public SourceWithProgress
|
class LogSource final : public SourceWithProgress
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
static Block getHeader(const NamesAndTypesList & columns)
|
static Block getHeader(const NamesAndTypesList & columns)
|
||||||
{
|
{
|
||||||
Block res;
|
Block res;
|
||||||
@ -116,13 +116,16 @@ private:
|
|||||||
class LogBlockOutputStream final : public IBlockOutputStream
|
class LogBlockOutputStream final : public IBlockOutputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit LogBlockOutputStream(StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
|
explicit LogBlockOutputStream(
|
||||||
|
StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
|
||||||
: storage(storage_)
|
: storage(storage_)
|
||||||
, metadata_snapshot(metadata_snapshot_)
|
, metadata_snapshot(metadata_snapshot_)
|
||||||
, lock(storage.rwlock)
|
, lock(std::move(lock_))
|
||||||
, marks_stream(
|
, marks_stream(
|
||||||
storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
|
storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
|
||||||
{
|
{
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
~LogBlockOutputStream() override
|
~LogBlockOutputStream() override
|
||||||
@ -149,7 +152,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
StorageLog & storage;
|
StorageLog & storage;
|
||||||
StorageMetadataPtr metadata_snapshot;
|
StorageMetadataPtr metadata_snapshot;
|
||||||
std::unique_lock<std::shared_mutex> lock;
|
std::unique_lock<std::shared_timed_mutex> lock;
|
||||||
bool done = false;
|
bool done = false;
|
||||||
|
|
||||||
struct Stream
|
struct Stream
|
||||||
@ -507,9 +510,11 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StorageLog::loadMarks()
|
void StorageLog::loadMarks(std::chrono::seconds lock_timeout)
|
||||||
{
|
{
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
std::unique_lock lock(rwlock, lock_timeout);
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
if (loaded_marks)
|
if (loaded_marks)
|
||||||
return;
|
return;
|
||||||
@ -552,8 +557,6 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID &
|
|||||||
{
|
{
|
||||||
assert(table_path != new_path_to_table_data);
|
assert(table_path != new_path_to_table_data);
|
||||||
{
|
{
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
disk->moveDirectory(table_path, new_path_to_table_data);
|
disk->moveDirectory(table_path, new_path_to_table_data);
|
||||||
|
|
||||||
table_path = new_path_to_table_data;
|
table_path = new_path_to_table_data;
|
||||||
@ -569,8 +572,6 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID &
|
|||||||
|
|
||||||
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
|
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
|
||||||
{
|
{
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
files.clear();
|
files.clear();
|
||||||
file_count = 0;
|
file_count = 0;
|
||||||
loaded_marks = false;
|
loaded_marks = false;
|
||||||
@ -610,6 +611,17 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
|
|||||||
return it->second.marks;
|
return it->second.marks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static std::chrono::seconds getLockTimeout(const Context & context)
|
||||||
|
{
|
||||||
|
const Settings & settings = context.getSettingsRef();
|
||||||
|
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
|
||||||
|
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
|
||||||
|
lock_timeout = settings.max_execution_time.totalSeconds();
|
||||||
|
return std::chrono::seconds{lock_timeout};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Pipe StorageLog::read(
|
Pipe StorageLog::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
@ -620,11 +632,15 @@ Pipe StorageLog::read(
|
|||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||||
loadMarks();
|
|
||||||
|
auto lock_timeout = getLockTimeout(context);
|
||||||
|
loadMarks(lock_timeout);
|
||||||
|
|
||||||
NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names));
|
NamesAndTypesList all_columns = Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names));
|
||||||
|
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
std::shared_lock lock(rwlock, lock_timeout);
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
|
|
||||||
@ -653,18 +669,28 @@ Pipe StorageLog::read(
|
|||||||
max_read_buffer_size));
|
max_read_buffer_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.
|
||||||
return Pipe::unitePipes(std::move(pipes));
|
return Pipe::unitePipes(std::move(pipes));
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
|
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
|
||||||
{
|
{
|
||||||
loadMarks();
|
auto lock_timeout = getLockTimeout(context);
|
||||||
return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot);
|
loadMarks(lock_timeout);
|
||||||
|
|
||||||
|
std::unique_lock lock(rwlock, lock_timeout);
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
|
return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
|
CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & context)
|
||||||
{
|
{
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
std::shared_lock lock(rwlock, getLockTimeout(context));
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
return file_checker.check();
|
return file_checker.check();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ private:
|
|||||||
DiskPtr disk;
|
DiskPtr disk;
|
||||||
String table_path;
|
String table_path;
|
||||||
|
|
||||||
mutable std::shared_mutex rwlock;
|
mutable std::shared_timed_mutex rwlock;
|
||||||
|
|
||||||
Files files;
|
Files files;
|
||||||
|
|
||||||
@ -104,7 +104,7 @@ private:
|
|||||||
/// Read marks files if they are not already read.
|
/// Read marks files if they are not already read.
|
||||||
/// It is done lazily, so that with a large number of tables, the server starts quickly.
|
/// It is done lazily, so that with a large number of tables, the server starts quickly.
|
||||||
/// You can not call with a write locked `rwlock`.
|
/// You can not call with a write locked `rwlock`.
|
||||||
void loadMarks();
|
void loadMarks(std::chrono::seconds lock_timeout);
|
||||||
|
|
||||||
/** For normal columns, the number of rows in the block is specified in the marks.
|
/** For normal columns, the number of rows in the block is specified in the marks.
|
||||||
* For array columns and nested structures, there are more than one group of marks that correspond to different files
|
* For array columns and nested structures, there are more than one group of marks that correspond to different files
|
||||||
|
@ -47,13 +47,13 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int INCORRECT_FILE_NAME;
|
extern const int INCORRECT_FILE_NAME;
|
||||||
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class StripeLogSource final : public SourceWithProgress
|
class StripeLogSource final : public SourceWithProgress
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
static Block getHeader(
|
static Block getHeader(
|
||||||
StorageStripeLog & storage,
|
StorageStripeLog & storage,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
@ -157,10 +157,11 @@ private:
|
|||||||
class StripeLogBlockOutputStream final : public IBlockOutputStream
|
class StripeLogBlockOutputStream final : public IBlockOutputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit StripeLogBlockOutputStream(StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
|
explicit StripeLogBlockOutputStream(
|
||||||
|
StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
|
||||||
: storage(storage_)
|
: storage(storage_)
|
||||||
, metadata_snapshot(metadata_snapshot_)
|
, metadata_snapshot(metadata_snapshot_)
|
||||||
, lock(storage.rwlock)
|
, lock(std::move(lock_))
|
||||||
, data_out_file(storage.table_path + "data.bin")
|
, data_out_file(storage.table_path + "data.bin")
|
||||||
, data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
|
, data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
|
||||||
, data_out(std::make_unique<CompressedWriteBuffer>(
|
, data_out(std::make_unique<CompressedWriteBuffer>(
|
||||||
@ -170,6 +171,8 @@ public:
|
|||||||
, index_out(std::make_unique<CompressedWriteBuffer>(*index_out_compressed))
|
, index_out(std::make_unique<CompressedWriteBuffer>(*index_out_compressed))
|
||||||
, block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file))
|
, block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file))
|
||||||
{
|
{
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
~StripeLogBlockOutputStream() override
|
~StripeLogBlockOutputStream() override
|
||||||
@ -223,7 +226,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
StorageStripeLog & storage;
|
StorageStripeLog & storage;
|
||||||
StorageMetadataPtr metadata_snapshot;
|
StorageMetadataPtr metadata_snapshot;
|
||||||
std::unique_lock<std::shared_mutex> lock;
|
std::unique_lock<std::shared_timed_mutex> lock;
|
||||||
|
|
||||||
String data_out_file;
|
String data_out_file;
|
||||||
std::unique_ptr<WriteBuffer> data_out_compressed;
|
std::unique_ptr<WriteBuffer> data_out_compressed;
|
||||||
@ -286,8 +289,6 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
|
|||||||
{
|
{
|
||||||
assert(table_path != new_path_to_table_data);
|
assert(table_path != new_path_to_table_data);
|
||||||
{
|
{
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
disk->moveDirectory(table_path, new_path_to_table_data);
|
disk->moveDirectory(table_path, new_path_to_table_data);
|
||||||
|
|
||||||
table_path = new_path_to_table_data;
|
table_path = new_path_to_table_data;
|
||||||
@ -297,6 +298,16 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static std::chrono::seconds getLockTimeout(const Context & context)
|
||||||
|
{
|
||||||
|
const Settings & settings = context.getSettingsRef();
|
||||||
|
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
|
||||||
|
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
|
||||||
|
lock_timeout = settings.max_execution_time.totalSeconds();
|
||||||
|
return std::chrono::seconds{lock_timeout};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Pipe StorageStripeLog::read(
|
Pipe StorageStripeLog::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
@ -306,7 +317,9 @@ Pipe StorageStripeLog::read(
|
|||||||
const size_t /*max_block_size*/,
|
const size_t /*max_block_size*/,
|
||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
std::shared_lock lock(rwlock, getLockTimeout(context));
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||||
|
|
||||||
@ -345,24 +358,28 @@ Pipe StorageStripeLog::read(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
|
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
|
||||||
{
|
{
|
||||||
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot);
|
std::unique_lock lock(rwlock, getLockTimeout(context));
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
|
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot, std::move(lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
|
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & context)
|
||||||
{
|
{
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
std::shared_lock lock(rwlock, getLockTimeout(context));
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
return file_checker.check();
|
return file_checker.check();
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
|
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
|
||||||
{
|
{
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
disk->clearDirectory(table_path);
|
disk->clearDirectory(table_path);
|
||||||
|
|
||||||
file_checker = FileChecker{disk, table_path + "sizes.json"};
|
file_checker = FileChecker{disk, table_path + "sizes.json"};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ private:
|
|||||||
size_t max_compress_block_size;
|
size_t max_compress_block_size;
|
||||||
|
|
||||||
FileChecker file_checker;
|
FileChecker file_checker;
|
||||||
mutable std::shared_mutex rwlock;
|
mutable std::shared_timed_mutex rwlock;
|
||||||
|
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
};
|
};
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
#include <IO/ReadBufferFromFileBase.h>
|
#include <IO/ReadBufferFromFileBase.h>
|
||||||
#include <IO/WriteBufferFromFileBase.h>
|
#include <IO/WriteBufferFromFileBase.h>
|
||||||
|
#include <IO/LimitReadBuffer.h>
|
||||||
#include <Compression/CompressionFactory.h>
|
#include <Compression/CompressionFactory.h>
|
||||||
#include <Compression/CompressedReadBuffer.h>
|
#include <Compression/CompressedReadBuffer.h>
|
||||||
#include <Compression/CompressedWriteBuffer.h>
|
#include <Compression/CompressedWriteBuffer.h>
|
||||||
@ -46,6 +47,7 @@ namespace DB
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
|
extern const int TIMEOUT_EXCEEDED;
|
||||||
extern const int DUPLICATE_COLUMN;
|
extern const int DUPLICATE_COLUMN;
|
||||||
extern const int INCORRECT_FILE_NAME;
|
extern const int INCORRECT_FILE_NAME;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
@ -55,7 +57,6 @@ namespace ErrorCodes
|
|||||||
class TinyLogSource final : public SourceWithProgress
|
class TinyLogSource final : public SourceWithProgress
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
static Block getHeader(const NamesAndTypesList & columns)
|
static Block getHeader(const NamesAndTypesList & columns)
|
||||||
{
|
{
|
||||||
Block res;
|
Block res;
|
||||||
@ -66,10 +67,17 @@ public:
|
|||||||
return Nested::flatten(res);
|
return Nested::flatten(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
TinyLogSource(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
|
TinyLogSource(
|
||||||
|
size_t block_size_,
|
||||||
|
const NamesAndTypesList & columns_,
|
||||||
|
StorageTinyLog & storage_,
|
||||||
|
size_t max_read_buffer_size_,
|
||||||
|
FileChecker::Map file_sizes_)
|
||||||
: SourceWithProgress(getHeader(columns_))
|
: SourceWithProgress(getHeader(columns_))
|
||||||
, block_size(block_size_), columns(columns_), storage(storage_), lock(storage_.rwlock)
|
, block_size(block_size_), columns(columns_), storage(storage_)
|
||||||
, max_read_buffer_size(max_read_buffer_size_) {}
|
, max_read_buffer_size(max_read_buffer_size_), file_sizes(std::move(file_sizes_))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
String getName() const override { return "TinyLog"; }
|
String getName() const override { return "TinyLog"; }
|
||||||
|
|
||||||
@ -80,19 +88,21 @@ private:
|
|||||||
size_t block_size;
|
size_t block_size;
|
||||||
NamesAndTypesList columns;
|
NamesAndTypesList columns;
|
||||||
StorageTinyLog & storage;
|
StorageTinyLog & storage;
|
||||||
std::shared_lock<std::shared_mutex> lock;
|
|
||||||
bool is_finished = false;
|
bool is_finished = false;
|
||||||
size_t max_read_buffer_size;
|
size_t max_read_buffer_size;
|
||||||
|
FileChecker::Map file_sizes;
|
||||||
|
|
||||||
struct Stream
|
struct Stream
|
||||||
{
|
{
|
||||||
Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_)
|
Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_, size_t file_size)
|
||||||
: plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))),
|
: plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))),
|
||||||
|
limited(std::make_unique<LimitReadBuffer>(*plain, file_size, false)),
|
||||||
compressed(*plain)
|
compressed(*plain)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> plain;
|
std::unique_ptr<ReadBuffer> plain;
|
||||||
|
std::unique_ptr<ReadBuffer> limited;
|
||||||
CompressedReadBuffer compressed;
|
CompressedReadBuffer compressed;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -110,9 +120,14 @@ private:
|
|||||||
class TinyLogBlockOutputStream final : public IBlockOutputStream
|
class TinyLogBlockOutputStream final : public IBlockOutputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit TinyLogBlockOutputStream(StorageTinyLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
|
explicit TinyLogBlockOutputStream(
|
||||||
: storage(storage_), metadata_snapshot(metadata_snapshot_), lock(storage_.rwlock)
|
StorageTinyLog & storage_,
|
||||||
|
const StorageMetadataPtr & metadata_snapshot_,
|
||||||
|
std::unique_lock<std::shared_timed_mutex> && lock_)
|
||||||
|
: storage(storage_), metadata_snapshot(metadata_snapshot_), lock(std::move(lock_))
|
||||||
{
|
{
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
~TinyLogBlockOutputStream() override
|
~TinyLogBlockOutputStream() override
|
||||||
@ -140,7 +155,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
StorageTinyLog & storage;
|
StorageTinyLog & storage;
|
||||||
StorageMetadataPtr metadata_snapshot;
|
StorageMetadataPtr metadata_snapshot;
|
||||||
std::unique_lock<std::shared_mutex> lock;
|
std::unique_lock<std::shared_timed_mutex> lock;
|
||||||
bool done = false;
|
bool done = false;
|
||||||
|
|
||||||
struct Stream
|
struct Stream
|
||||||
@ -231,13 +246,17 @@ void TinyLogSource::readData(const String & name, const IDataType & type, IColum
|
|||||||
String stream_name = IDataType::getFileNameForStream(name, path);
|
String stream_name = IDataType::getFileNameForStream(name, path);
|
||||||
|
|
||||||
if (!streams.count(stream_name))
|
if (!streams.count(stream_name))
|
||||||
streams[stream_name] = std::make_unique<Stream>(storage.disk, storage.files[stream_name].data_file_path, max_read_buffer_size);
|
{
|
||||||
|
String file_path = storage.files[stream_name].data_file_path;
|
||||||
|
streams[stream_name] = std::make_unique<Stream>(
|
||||||
|
storage.disk, file_path, max_read_buffer_size, file_sizes[fileName(file_path)]);
|
||||||
|
}
|
||||||
|
|
||||||
return &streams[stream_name]->compressed;
|
return &streams[stream_name]->compressed;
|
||||||
};
|
};
|
||||||
|
|
||||||
if (deserialize_states.count(name) == 0)
|
if (deserialize_states.count(name) == 0)
|
||||||
type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
|
type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
|
||||||
|
|
||||||
type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]);
|
type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]);
|
||||||
}
|
}
|
||||||
@ -410,8 +429,6 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage
|
|||||||
{
|
{
|
||||||
assert(table_path != new_path_to_table_data);
|
assert(table_path != new_path_to_table_data);
|
||||||
{
|
{
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
disk->moveDirectory(table_path, new_path_to_table_data);
|
disk->moveDirectory(table_path, new_path_to_table_data);
|
||||||
|
|
||||||
table_path = new_path_to_table_data;
|
table_path = new_path_to_table_data;
|
||||||
@ -424,6 +441,16 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static std::chrono::seconds getLockTimeout(const Context & context)
|
||||||
|
{
|
||||||
|
const Settings & settings = context.getSettingsRef();
|
||||||
|
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
|
||||||
|
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
|
||||||
|
lock_timeout = settings.max_execution_time.totalSeconds();
|
||||||
|
return std::chrono::seconds{lock_timeout};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Pipe StorageTinyLog::read(
|
Pipe StorageTinyLog::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
@ -437,28 +464,40 @@ Pipe StorageTinyLog::read(
|
|||||||
|
|
||||||
// When reading, we lock the entire storage, because we only have one file
|
// When reading, we lock the entire storage, because we only have one file
|
||||||
// per column and can't modify it concurrently.
|
// per column and can't modify it concurrently.
|
||||||
|
const Settings & settings = context.getSettingsRef();
|
||||||
|
|
||||||
|
std::shared_lock lock{rwlock, getLockTimeout(context)};
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
|
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.
|
||||||
return Pipe(std::make_shared<TinyLogSource>(
|
return Pipe(std::make_shared<TinyLogSource>(
|
||||||
max_block_size, Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
|
max_block_size,
|
||||||
|
Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)),
|
||||||
|
*this,
|
||||||
|
settings.max_read_buffer_size,
|
||||||
|
file_checker.getFileSizes()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
|
BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
|
||||||
{
|
{
|
||||||
return std::make_shared<TinyLogBlockOutputStream>(*this, metadata_snapshot);
|
return std::make_shared<TinyLogBlockOutputStream>(*this, metadata_snapshot, std::unique_lock{rwlock, getLockTimeout(context)});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
|
CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & context)
|
||||||
{
|
{
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
std::shared_lock lock(rwlock, getLockTimeout(context));
|
||||||
|
if (!lock)
|
||||||
|
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||||
|
|
||||||
return file_checker.check();
|
return file_checker.check();
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageTinyLog::truncate(
|
void StorageTinyLog::truncate(
|
||||||
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
|
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
|
||||||
{
|
{
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
|
|
||||||
disk->clearDirectory(table_path);
|
disk->clearDirectory(table_path);
|
||||||
|
|
||||||
files.clear();
|
files.clear();
|
||||||
@ -468,14 +507,6 @@ void StorageTinyLog::truncate(
|
|||||||
addFiles(column.name, *column.type);
|
addFiles(column.name, *column.type);
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageTinyLog::drop()
|
|
||||||
{
|
|
||||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
|
||||||
if (disk->exists(table_path))
|
|
||||||
disk->removeRecursive(table_path);
|
|
||||||
files.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void registerStorageTinyLog(StorageFactory & factory)
|
void registerStorageTinyLog(StorageFactory & factory)
|
||||||
{
|
{
|
||||||
|
@ -43,8 +43,6 @@ public:
|
|||||||
|
|
||||||
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
|
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
|
||||||
|
|
||||||
void drop() override;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageTinyLog(
|
StorageTinyLog(
|
||||||
DiskPtr disk_,
|
DiskPtr disk_,
|
||||||
@ -70,7 +68,7 @@ private:
|
|||||||
Files files;
|
Files files;
|
||||||
|
|
||||||
FileChecker file_checker;
|
FileChecker file_checker;
|
||||||
mutable std::shared_mutex rwlock;
|
mutable std::shared_timed_mutex rwlock;
|
||||||
|
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
|
3
tests/queries/0_stateless/01499_log_deadlock.reference
Normal file
3
tests/queries/0_stateless/01499_log_deadlock.reference
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
6
|
||||||
|
6
|
||||||
|
6
|
26
tests/queries/0_stateless/01499_log_deadlock.sql
Normal file
26
tests/queries/0_stateless/01499_log_deadlock.sql
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
DROP TABLE IF EXISTS t;
|
||||||
|
CREATE TABLE t (x UInt8) ENGINE = TinyLog;
|
||||||
|
|
||||||
|
INSERT INTO t VALUES (1), (2), (3);
|
||||||
|
INSERT INTO t SELECT * FROM t;
|
||||||
|
SELECT count() FROM t;
|
||||||
|
|
||||||
|
DROP TABLE t;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE t (x UInt8) ENGINE = Log;
|
||||||
|
|
||||||
|
INSERT INTO t VALUES (1), (2), (3);
|
||||||
|
INSERT INTO t SELECT * FROM t;
|
||||||
|
SELECT count() FROM t;
|
||||||
|
|
||||||
|
DROP TABLE t;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE t (x UInt8) ENGINE = StripeLog;
|
||||||
|
|
||||||
|
INSERT INTO t VALUES (1), (2), (3);
|
||||||
|
INSERT INTO t SELECT * FROM t;
|
||||||
|
SELECT count() FROM t;
|
||||||
|
|
||||||
|
DROP TABLE t;
|
@ -0,0 +1,6 @@
|
|||||||
|
Testing TinyLog
|
||||||
|
Done TinyLog
|
||||||
|
Testing StripeLog
|
||||||
|
Done StripeLog
|
||||||
|
Testing Log
|
||||||
|
Done Log
|
85
tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh
Executable file
85
tests/queries/0_stateless/01502_long_log_tinylog_deadlock_race.sh
Executable file
@ -0,0 +1,85 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=fatal
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
|
||||||
|
function thread_create {
|
||||||
|
while true; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "CREATE TABLE IF NOT EXISTS $1 (x UInt64, s Array(Nullable(String))) ENGINE = $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)'
|
||||||
|
sleep 0.0$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function thread_drop {
|
||||||
|
while true; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS $1" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)'
|
||||||
|
sleep 0.0$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function thread_rename {
|
||||||
|
while true; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "RENAME TABLE $1 TO $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|57)'
|
||||||
|
sleep 0.0$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function thread_select {
|
||||||
|
while true; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT * FROM $1 FORMAT Null" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)'
|
||||||
|
sleep 0.0$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function thread_insert {
|
||||||
|
while true; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT rand64(1), [toString(rand64(2))] FROM numbers($2)" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)'
|
||||||
|
sleep 0.0$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function thread_insert_select {
|
||||||
|
while true; do
|
||||||
|
$CLICKHOUSE_CLIENT --query "INSERT INTO $1 SELECT * FROM $2" 2>&1 | grep -v -F 'Received exception from server' | grep -v -P 'Code: (60|218)'
|
||||||
|
sleep 0.0$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
export -f thread_create
|
||||||
|
export -f thread_drop
|
||||||
|
export -f thread_rename
|
||||||
|
export -f thread_select
|
||||||
|
export -f thread_insert
|
||||||
|
export -f thread_insert_select
|
||||||
|
|
||||||
|
|
||||||
|
# Do randomized queries and expect nothing extraordinary happens.
|
||||||
|
|
||||||
|
function test_with_engine {
|
||||||
|
echo "Testing $1"
|
||||||
|
|
||||||
|
timeout 10 bash -c "thread_create t1 $1" &
|
||||||
|
timeout 10 bash -c "thread_create t2 $1" &
|
||||||
|
timeout 10 bash -c 'thread_drop t1' &
|
||||||
|
timeout 10 bash -c 'thread_drop t2' &
|
||||||
|
timeout 10 bash -c 'thread_rename t1 t2' &
|
||||||
|
timeout 10 bash -c 'thread_rename t2 t1' &
|
||||||
|
timeout 10 bash -c 'thread_select t1' &
|
||||||
|
timeout 10 bash -c 'thread_select t2' &
|
||||||
|
timeout 10 bash -c 'thread_insert t1 5' &
|
||||||
|
timeout 10 bash -c 'thread_insert t2 10' &
|
||||||
|
timeout 10 bash -c 'thread_insert_select t1 t2' &
|
||||||
|
timeout 10 bash -c 'thread_insert_select t2 t1' &
|
||||||
|
|
||||||
|
wait
|
||||||
|
echo "Done $1"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_with_engine TinyLog
|
||||||
|
test_with_engine StripeLog
|
||||||
|
test_with_engine Log
|
12
tests/queries/0_stateless/01505_log_distributed_deadlock.sql
Normal file
12
tests/queries/0_stateless/01505_log_distributed_deadlock.sql
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
DROP TABLE IF EXISTS t_local;
|
||||||
|
DROP TABLE IF EXISTS t_dist;
|
||||||
|
|
||||||
|
create table t_local(a int) engine Log;
|
||||||
|
create table t_dist (a int) engine Distributed(test_shard_localhost, currentDatabase(), 't_local', cityHash64(a));
|
||||||
|
|
||||||
|
set insert_distributed_sync = 1;
|
||||||
|
|
||||||
|
insert into t_dist values (1);
|
||||||
|
|
||||||
|
DROP TABLE t_local;
|
||||||
|
DROP TABLE t_dist;
|
@ -145,3 +145,4 @@
|
|||||||
01461_query_start_time_microseconds
|
01461_query_start_time_microseconds
|
||||||
01455_shard_leaf_max_rows_bytes_to_read
|
01455_shard_leaf_max_rows_bytes_to_read
|
||||||
01505_distributed_local_type_conversion_enum
|
01505_distributed_local_type_conversion_enum
|
||||||
|
01505_log_distributed_deadlock
|
||||||
|
Loading…
Reference in New Issue
Block a user