fix reading from StorageLog with mmap

This commit is contained in:
Anton Popov 2022-07-13 20:35:24 +00:00
parent da2413f7d2
commit 089009c996
6 changed files with 41 additions and 35 deletions

View File

@ -126,7 +126,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
std::unique_lock lock(write_mutex);
MergeTreeData::MutableDataPartsVector parts;
auto in = disk->readFile(path, {});
auto in = disk->readFile(path);
NativeReader block_in(*in, 0);
NameSet dropped_parts;

View File

@ -535,15 +535,16 @@ StorageLog::StorageLog(
const ConstraintsDescription & constraints_,
const String & comment,
bool attach,
size_t max_compress_block_size_)
ContextMutablePtr context_)
: IStorage(table_id_)
, WithMutableContext(context_)
, engine_name(engine_name_)
, disk(std::move(disk_))
, table_path(relative_path_)
, use_marks_file(engine_name == "Log")
, marks_file_path(table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME)
, file_checker(disk, table_path + "sizes.json")
, max_compress_block_size(max_compress_block_size_)
, max_compress_block_size(context_->getSettingsRef().max_compress_block_size)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -750,9 +751,9 @@ static std::chrono::seconds getLockTimeout(ContextPtr context)
return std::chrono::seconds{lock_timeout};
}
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &)
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
WriteLock lock{rwlock, getLockTimeout(context)};
WriteLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -769,6 +770,7 @@ void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr
marks_loaded = true;
num_marks_saved = 0;
getContext()->dropMMappedFileCache();
}
@ -776,14 +778,14 @@ Pipe StorageLog::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
storage_snapshot->check(column_names);
auto lock_timeout = getLockTimeout(context);
auto lock_timeout = getLockTimeout(local_context);
loadMarks(lock_timeout);
ReadLock lock{rwlock, lock_timeout};
@ -817,7 +819,7 @@ Pipe StorageLog::read(
bool limited_by_file_sizes = !use_marks_file;
size_t row_limit = std::numeric_limits<size_t>::max();
ReadSettings read_settings = context->getReadSettings();
ReadSettings read_settings = local_context->getReadSettings();
Pipes pipes;
for (size_t stream = 0; stream < num_streams; ++stream)
@ -848,18 +850,18 @@ Pipe StorageLog::read(
return Pipe::unitePipes(std::move(pipes));
}
SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
WriteLock lock{rwlock, getLockTimeout(context)};
WriteLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return std::make_shared<LogSink>(*this, metadata_snapshot, std::move(lock));
}
CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr context)
CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr local_context)
{
ReadLock lock{rwlock, getLockTimeout(context)};
ReadLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -1114,7 +1116,7 @@ void registerStorageLog(StorageFactory & factory)
args.constraints,
args.comment,
args.attach,
args.getContext()->getSettings().max_compress_block_size);
args.getContext());
};
factory.registerStorage("Log", create_fn, features);

View File

@ -12,6 +12,7 @@
namespace DB
{
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
@ -21,7 +22,7 @@ using BackupPtr = std::shared_ptr<const IBackup>;
* Also implements TinyLog - a table engine that is suitable for small chunks of the log.
* It differs from Log in the absence of mark files.
*/
class StorageLog final : public IStorage
class StorageLog final : public IStorage, public WithMutableContext
{
friend class LogSource;
friend class LogSink;
@ -40,7 +41,7 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
bool attach,
size_t max_compress_block_size_);
ContextMutablePtr context_);
~StorageLog() override;
String getName() const override { return engine_name; }
@ -49,16 +50,16 @@ public:
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */) override;
CheckResults checkData(const ASTPtr & query, ContextPtr local_context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;

View File

@ -265,14 +265,15 @@ StorageStripeLog::StorageStripeLog(
const ConstraintsDescription & constraints_,
const String & comment,
bool attach,
size_t max_compress_block_size_)
ContextMutablePtr context_)
: IStorage(table_id_)
, WithMutableContext(context_)
, disk(std::move(disk_))
, table_path(relative_path_)
, data_file_path(table_path + "data.bin")
, index_file_path(table_path + "index.mrk")
, file_checker(disk, table_path + "sizes.json")
, max_compress_block_size(max_compress_block_size_)
, max_compress_block_size(context_->getSettings().max_compress_block_size)
, log(&Poco::Logger::get("StorageStripeLog"))
{
StorageInMemoryMetadata storage_metadata;
@ -330,9 +331,9 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
}
static std::chrono::seconds getLockTimeout(ContextPtr context)
static std::chrono::seconds getLockTimeout(ContextPtr local_context)
{
const Settings & settings = context->getSettingsRef();
const Settings & settings = local_context->getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
@ -344,14 +345,14 @@ Pipe StorageStripeLog::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
unsigned num_streams)
{
storage_snapshot->check(column_names);
auto lock_timeout = getLockTimeout(context);
auto lock_timeout = getLockTimeout(local_context);
loadIndices(lock_timeout);
ReadLock lock{rwlock, lock_timeout};
@ -369,7 +370,7 @@ Pipe StorageStripeLog::read(
if (num_streams > size)
num_streams = size;
ReadSettings read_settings = context->getReadSettings();
ReadSettings read_settings = local_context->getReadSettings();
Pipes pipes;
for (size_t stream = 0; stream < num_streams; ++stream)
@ -390,9 +391,9 @@ Pipe StorageStripeLog::read(
}
SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
WriteLock lock{rwlock, getLockTimeout(context)};
WriteLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -400,9 +401,9 @@ SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const Storage
}
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr context)
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr local_context)
{
ReadLock lock{rwlock, getLockTimeout(context)};
ReadLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -420,6 +421,7 @@ void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
indices_loaded = true;
num_indices_saved = 0;
getContext()->dropMMappedFileCache();
}
@ -686,7 +688,7 @@ void registerStorageStripeLog(StorageFactory & factory)
args.constraints,
args.comment,
args.attach,
args.getContext()->getSettings().max_compress_block_size);
args.getContext());
}, features);
}

View File

@ -20,7 +20,7 @@ using BackupPtr = std::shared_ptr<const IBackup>;
/** Implements a table engine that is suitable for small chunks of the log.
* In doing so, stores all the columns in a single Native file, with a nearby index.
*/
class StorageStripeLog final : public IStorage
class StorageStripeLog final : public IStorage, public WithMutableContext
{
friend class StripeLogSource;
friend class StripeLogSink;
@ -34,7 +34,7 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
bool attach,
size_t max_compress_block_size_);
ContextMutablePtr context_);
~StorageStripeLog() override;
@ -44,16 +44,16 @@ public:
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */) override;
CheckResults checkData(const ASTPtr & query, ContextPtr ocal_context) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }

View File

@ -4,6 +4,7 @@ DROP TABLE IF EXISTS nested;
SET flatten_nested = 0;
SET use_uncompressed_cache = 0;
SET local_filesystem_read_method='pread';
CREATE TABLE nested
(