Fix implementation of totalBytes() & totalRows() for Log family.

This commit is contained in:
Vitaly Baranov 2022-05-03 19:55:45 +02:00
parent 202dd864ed
commit 160bc288d3
9 changed files with 220 additions and 190 deletions

View File

@ -34,12 +34,12 @@ namespace
if (only_shard_num)
{
if ((only_shard_num <= cluster_host_ids.size()) && (cluster_host_ids[only_shard_num - 1].size() > 1))
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Backup of multiple replicas is disabled, choose one replica with the replica_num setting or specify allow_storing_multiple_replicas=true");
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Backup of multiple replicas is disabled. Choose one replica with the replica_num setting or specify allow_storing_multiple_replicas=true");
}
for (size_t i = 0; i != cluster_host_ids.size(); ++i)
for (const auto & shard : cluster_host_ids)
{
if (cluster_host_ids[i].size() > 1)
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Backup of multiple replicas is disabled, choose one replica with the replica_num setting or specify allow_storing_multiple_replicas=true");
if (shard.size() > 1)
throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Backup of multiple replicas is disabled. Choose one replica with the replica_num setting or specify allow_storing_multiple_replicas=true");
}
}

View File

@ -69,9 +69,9 @@ size_t FileChecker::getFileSize(const String & full_file_path) const
return it->second;
}
UInt64 FileChecker::getTotalSize() const
size_t FileChecker::getTotalSize() const
{
UInt64 total_size = 0;
size_t total_size = 0;
for (auto size : map | boost::adaptors::map_values)
total_size += size;
return total_size;

View File

@ -63,7 +63,7 @@ public:
Block res;
for (const auto & name_type : columns)
res.insert({name_type.type->createColumn(), name_type.type, name_type.name});
res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
return res;
}
@ -98,7 +98,7 @@ private:
const size_t block_size;
const NamesAndTypesList columns;
const StorageLog & storage;
const size_t rows_limit; /// The maximum number of rows that can be read
const size_t rows_limit; /// The maximum number of rows that can be read
size_t rows_read = 0;
bool is_finished = false;
const std::vector<size_t> offsets;
@ -108,13 +108,7 @@ private:
struct Stream
{
Stream(
const DiskPtr & disk,
const String & data_path,
size_t offset,
size_t file_size,
bool limited_by_file_size,
ReadSettings read_settings_)
Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t file_size, bool limited_by_file_size, ReadSettings read_settings_)
{
plain = disk->readFile(data_path, read_settings_.adjustBufferSize(file_size));
@ -142,8 +136,7 @@ private:
using DeserializeStates = std::map<String, DeserializeState>;
DeserializeStates deserialize_states;
void
readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t max_rows_to_read, ISerialization::SubstreamsCache & cache);
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t max_rows_to_read, ISerialization::SubstreamsCache & cache);
bool isFinished();
};
@ -201,8 +194,8 @@ Chunk LogSource::generate()
}
void LogSource::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column, size_t max_rows_to_read, ISerialization::SubstreamsCache & cache)
void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t max_rows_to_read, ISerialization::SubstreamsCache & cache)
{
ISerialization::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
const auto & [name, type] = name_and_type;
@ -210,7 +203,7 @@ void LogSource::readData(
auto create_stream_getter = [&](bool stream_for_prefix)
{
return [&, stream_for_prefix](const ISerialization::SubstreamPath & path) -> ReadBuffer * //-V1047
return [&, stream_for_prefix] (const ISerialization::SubstreamPath & path) -> ReadBuffer * //-V1047
{
if (cache.contains(ISerialization::getSubcolumnNameForStream(path)))
return nullptr;
@ -225,9 +218,7 @@ void LogSource::readData(
size_t offset = stream_for_prefix ? 0 : offsets[data_file.index];
size_t file_size = file_sizes[data_file.index];
auto it
= streams.try_emplace(data_file_name, storage.disk, data_file.path, offset, file_size, limited_by_file_sizes, read_settings)
.first;
auto it = streams.try_emplace(data_file_name, storage.disk, data_file.path, offset, file_size, limited_by_file_sizes, read_settings).first;
return &it->second.compressed.value();
};
};
@ -274,7 +265,8 @@ class LogSink final : public SinkToStorage
public:
using WriteLock = std::unique_lock<std::shared_timed_mutex>;
explicit LogSink(StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, WriteLock && lock_)
explicit LogSink(
StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, WriteLock && lock_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
@ -327,15 +319,10 @@ private:
struct Stream
{
Stream(
const DiskPtr & disk,
const String & data_path,
size_t initial_data_size,
CompressionCodecPtr codec,
size_t max_compress_block_size)
: plain(disk->writeFile(data_path, max_compress_block_size, WriteMode::Append))
, compressed(*plain, std::move(codec), max_compress_block_size)
, plain_offset(initial_data_size)
Stream(const DiskPtr & disk, const String & data_path, size_t initial_data_size, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(disk->writeFile(data_path, max_compress_block_size, WriteMode::Append)),
compressed(*plain, std::move(codec), max_compress_block_size),
plain_offset(initial_data_size)
{
}
@ -411,6 +398,7 @@ void LogSink::onFinish()
storage.saveMarks(lock);
storage.saveFileSizes(lock);
storage.updateTotalRows(lock);
done = true;
@ -423,12 +411,13 @@ void LogSink::onFinish()
ISerialization::OutputStreamGetter LogSink::createStreamGetter(const NameAndTypePair & name_and_type)
{
return [&](const ISerialization::SubstreamPath & path) -> WriteBuffer *
return [&] (const ISerialization::SubstreamPath & path) -> WriteBuffer *
{
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path);
auto it = streams.find(data_file_name);
if (it == streams.end())
throw Exception("Logical error: stream was not created when writing data in LogSink", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: stream was not created when writing data in LogSink",
ErrorCodes::LOGICAL_ERROR);
Stream & stream = it->second;
if (stream.written)
@ -445,74 +434,65 @@ void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & c
const auto & [name, type] = name_and_type;
auto serialization = type->getDefaultSerialization();
serialization->enumerateStreams(
[&](const ISerialization::SubstreamPath & path)
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & path)
{
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path);
auto it = streams.find(data_file_name);
if (it == streams.end())
{
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path);
auto it = streams.find(data_file_name);
if (it == streams.end())
{
const auto & data_file_it = storage.data_files_by_names.find(data_file_name);
if (data_file_it == storage.data_files_by_names.end())
throw Exception(
"Logical error: no information about file " + data_file_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
const auto & data_file_it = storage.data_files_by_names.find(data_file_name);
if (data_file_it == storage.data_files_by_names.end())
throw Exception("Logical error: no information about file " + data_file_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
const auto & data_file = *data_file_it->second;
const auto & columns = metadata_snapshot->getColumns();
const auto & data_file = *data_file_it->second;
const auto & columns = metadata_snapshot->getColumns();
it = streams
.try_emplace(
data_file.name,
storage.disk,
data_file.path,
storage.file_checker.getFileSize(data_file.path),
columns.getCodecOrDefault(name_and_type.name),
storage.max_compress_block_size)
.first;
}
it = streams.try_emplace(data_file.name, storage.disk, data_file.path,
storage.file_checker.getFileSize(data_file.path),
columns.getCodecOrDefault(name_and_type.name),
storage.max_compress_block_size).first;
}
auto & stream = it->second;
if (stream.written)
return;
});
auto & stream = it->second;
if (stream.written)
return;
});
settings.getter = createStreamGetter(name_and_type);
if (!serialize_states.contains(name))
serialization->serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
serialization->serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
if (storage.use_marks_file)
{
serialization->enumerateStreams(
[&](const ISerialization::SubstreamPath & path)
{
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path);
const auto & stream = streams.at(data_file_name);
if (stream.written)
return;
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & path)
{
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path);
const auto & stream = streams.at(data_file_name);
if (stream.written)
return;
auto & data_file = *storage.data_files_by_names.at(data_file_name);
auto & marks = data_file.marks;
size_t prev_num_rows = marks.empty() ? 0 : marks.back().rows;
auto & mark = marks.emplace_back();
mark.rows = prev_num_rows + column.size();
mark.offset = stream.plain_offset + stream.plain->count();
});
auto & data_file = *storage.data_files_by_names.at(data_file_name);
auto & marks = data_file.marks;
size_t prev_num_rows = marks.empty() ? 0 : marks.back().rows;
auto & mark = marks.emplace_back();
mark.rows = prev_num_rows + column.size();
mark.offset = stream.plain_offset + stream.plain->count();
});
}
serialization->serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
serialization->enumerateStreams(
[&](const ISerialization::SubstreamPath & path)
{
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path);
auto & stream = streams.at(data_file_name);
if (stream.written)
return;
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & path)
{
String data_file_name = ISerialization::getFileNameForStream(name_and_type, path);
auto & stream = streams.at(data_file_name);
if (stream.written)
return;
stream.written = true;
stream.compressed.next();
});
stream.written = true;
stream.compressed.next();
});
}
@ -602,15 +582,18 @@ StorageLog::StorageLog(
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
total_bytes = file_checker.getTotalSize();
}
void StorageLog::addDataFiles(const NameAndTypePair & column)
{
if (data_files_by_names.contains(column.name))
throw Exception("Duplicate column with name " + column.name + " in constructor of StorageLog.", ErrorCodes::DUPLICATE_COLUMN);
throw Exception("Duplicate column with name " + column.name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
ISerialization::StreamCallback stream_callback = [&](const ISerialization::SubstreamPath & substream_path)
ISerialization::StreamCallback stream_callback = [&] (const ISerialization::SubstreamPath & substream_path)
{
String data_file_name = ISerialization::getFileNameForStream(column, substream_path);
if (!data_files_by_names.contains(data_file_name))
@ -630,7 +613,7 @@ void StorageLog::addDataFiles(const NameAndTypePair & column)
}
void StorageLog::loadMarks(std::chrono::seconds lock_timeout) const
void StorageLog::loadMarks(std::chrono::seconds lock_timeout)
{
if (!use_marks_file || marks_loaded)
return;
@ -644,7 +627,7 @@ void StorageLog::loadMarks(std::chrono::seconds lock_timeout) const
loadMarks(lock);
}
void StorageLog::loadMarks(const WriteLock & /* already locked exclusively */) const
void StorageLog::loadMarks(const WriteLock & lock /* already locked exclusively */)
{
if (!use_marks_file || marks_loaded)
return;
@ -675,6 +658,9 @@ void StorageLog::loadMarks(const WriteLock & /* already locked exclusively */) c
marks_loaded = true;
num_marks_saved = num_marks;
/// We need marks to calculate the number of rows, and now we have the marks.
updateTotalRows(lock);
}
void StorageLog::saveMarks(const WriteLock & /* already locked for writing */)
@ -733,6 +719,7 @@ void StorageLog::saveFileSizes(const WriteLock & /* already locked for writing *
file_checker.update(marks_file_path);
file_checker.save();
total_bytes = file_checker.getTotalSize();
}
@ -753,19 +740,15 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID &
renameInMemory(new_table_id);
}
static std::chrono::seconds getLockTimeout(const Settings & settings)
static std::chrono::seconds getLockTimeout(ContextPtr context)
{
const Settings & settings = context->getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
return std::chrono::seconds{lock_timeout};
}
static std::chrono::seconds getLockTimeout(ContextPtr context)
{
return getLockTimeout(context->getSettingsRef());
}
void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &)
{
WriteLock lock{rwlock, getLockTimeout(context)};
@ -850,7 +833,14 @@ Pipe StorageLog::read(
}
pipes.emplace_back(std::make_shared<LogSource>(
max_block_size, all_columns, *this, row_limit, offsets, file_sizes, limited_by_file_sizes, read_settings));
max_block_size,
all_columns,
*this,
row_limit,
offsets,
file_sizes,
limited_by_file_sizes,
read_settings));
}
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.
@ -886,7 +876,7 @@ IStorage::ColumnSizeByName StorageLog::getColumnSizes() const
for (const auto & column : getInMemoryMetadata().getColumns().getAllPhysical())
{
ISerialization::StreamCallback stream_callback = [&, this](const ISerialization::SubstreamPath & substream_path)
ISerialization::StreamCallback stream_callback = [&, this] (const ISerialization::SubstreamPath & substream_path)
{
String data_file_name = ISerialization::getFileNameForStream(column, substream_path);
auto it = data_files_by_names.find(data_file_name);
@ -904,6 +894,32 @@ IStorage::ColumnSizeByName StorageLog::getColumnSizes() const
return column_sizes;
}
void StorageLog::updateTotalRows(const WriteLock &)
{
if (!use_marks_file || !marks_loaded)
return;
if (num_data_files)
total_rows = data_files[INDEX_WITH_REAL_ROW_COUNT].marks.empty() ? 0 : data_files[INDEX_WITH_REAL_ROW_COUNT].marks.back().rows;
else
total_rows = 0;
}
std::optional<UInt64> StorageLog::totalRows(const Settings &) const
{
if (use_marks_file && marks_loaded)
return total_rows;
if (!total_bytes)
return 0;
return {};
}
std::optional<UInt64> StorageLog::totalBytes(const Settings &) const
{
return total_bytes;
}
BackupEntries StorageLog::backupData(ContextPtr context, const ASTs & partitions)
{
@ -1051,7 +1067,7 @@ public:
{
Mark mark;
mark.read(*marks_rb);
mark.rows += old_num_rows[j]; /// Adjust the number of rows.
mark.rows += old_num_rows[j]; /// Adjust the number of rows.
mark.offset += old_data_sizes[j]; /// Adjust the offset.
data_files[j].marks.push_back(mark);
}
@ -1061,6 +1077,7 @@ public:
/// Finish writing.
storage->saveMarks(lock);
storage->saveFileSizes(lock);
storage->updateTotalRows(lock);
}
catch (...)
{
@ -1080,13 +1097,7 @@ private:
ContextMutablePtr context;
};
RestoreTaskPtr StorageLog::restoreData(
ContextMutablePtr context,
const ASTs & partitions,
const BackupPtr & backup,
const String & data_path_in_backup,
const StorageRestoreSettings &,
const std::shared_ptr<IRestoreCoordination> &)
RestoreTaskPtr StorageLog::restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &, const std::shared_ptr<IRestoreCoordination> &)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());
@ -1096,32 +1107,6 @@ RestoreTaskPtr StorageLog::restoreData(
}
std::optional<UInt64> StorageLog::totalRows(const Settings & settings) const
{
if (!use_marks_file)
return {};
auto lock_timeout = getLockTimeout(settings);
loadMarks(lock_timeout);
ReadLock lock{rwlock, lock_timeout};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return data_files[INDEX_WITH_REAL_ROW_COUNT].marks.empty() ? 0 : data_files[INDEX_WITH_REAL_ROW_COUNT].marks.back().rows;
}
std::optional<UInt64> StorageLog::totalBytes(const Settings & settings) const
{
ReadLock lock{rwlock, getLockTimeout(settings)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return file_checker.getTotalSize();
}
void registerStorageLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{

View File

@ -52,13 +52,13 @@ public:
bool supportsSubcolumns() const override { return true; }
ColumnSizeByName getColumnSizes() const override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
bool hasDataToBackup() const override { return true; }
BackupEntries backupData(ContextPtr context, const ASTs & partitions) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
protected:
/** Attach the table with the appropriate name, along the appropriate path (with / at the end),
* (the correctness of names and paths is not verified)
@ -85,8 +85,8 @@ private:
/// Reads the marks file if it hasn't read yet.
/// It is done lazily, so that with a large number of tables, the server starts quickly.
void loadMarks(std::chrono::seconds lock_timeout) const;
void loadMarks(const WriteLock &) const;
void loadMarks(std::chrono::seconds lock_timeout);
void loadMarks(const WriteLock &);
/// Saves the marks file.
void saveMarks(const WriteLock &);
@ -97,6 +97,9 @@ private:
/// Saves the sizes of the data and marks files.
void saveFileSizes(const WriteLock &);
/// Recalculates the number of rows stored in this table.
void updateTotalRows(const WriteLock &);
/** Offsets to some row number in a file for column in table.
* They are needed so that you can read the data in several threads.
*/
@ -116,7 +119,7 @@ private:
size_t index;
String name;
String path;
mutable Marks marks;
Marks marks;
};
const String engine_name;
@ -131,8 +134,11 @@ private:
const bool use_marks_file;
String marks_file_path;
mutable std::atomic<bool> marks_loaded = false;
mutable size_t num_marks_saved = 0;
std::atomic<bool> marks_loaded = false;
size_t num_marks_saved = 0;
std::atomic<UInt64> total_rows = 0;
std::atomic<UInt64> total_bytes = 0;
FileChecker file_checker;

View File

@ -449,16 +449,22 @@ RestoreTaskPtr StorageMaterializedView::restoreData(ContextMutablePtr context_,
std::optional<UInt64> StorageMaterializedView::totalRows(const Settings & settings) const
{
if (!hasInnerTable())
return 0;
return getTargetTable()->totalRows(settings);
if (hasInnerTable())
{
if (auto table = tryGetTargetTable())
return table->totalRows(settings);
}
return {};
}
std::optional<UInt64> StorageMaterializedView::totalBytes(const Settings & settings) const
{
if (!hasInnerTable())
return 0;
return getTargetTable()->totalBytes(settings);
if (hasInnerTable())
{
if (auto table = tryGetTargetTable())
return table->totalBytes(settings);
}
return {};
}
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)

View File

@ -239,6 +239,8 @@ public:
/// Save the new file sizes.
storage.saveFileSizes(lock);
storage.updateTotalRows(lock);
done = true;
/// unlock should be done from the same thread as lock, and dtor may be
@ -310,6 +312,8 @@ StorageStripeLog::StorageStripeLog(
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
total_bytes = file_checker.getTotalSize();
}
@ -331,8 +335,9 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
}
static std::chrono::seconds getLockTimeout(const Settings & settings)
static std::chrono::seconds getLockTimeout(ContextPtr context)
{
const Settings & settings = context->getSettingsRef();
Int64 lock_timeout = settings.lock_acquire_timeout.totalSeconds();
if (settings.max_execution_time.totalSeconds() != 0 && settings.max_execution_time.totalSeconds() < lock_timeout)
lock_timeout = settings.max_execution_time.totalSeconds();
@ -340,12 +345,6 @@ static std::chrono::seconds getLockTimeout(const Settings & settings)
}
static std::chrono::seconds getLockTimeout(ContextPtr context)
{
return getLockTimeout(context->getSettingsRef());
}
Pipe StorageStripeLog::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -429,7 +428,7 @@ void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
}
void StorageStripeLog::loadIndices(std::chrono::seconds lock_timeout) const
void StorageStripeLog::loadIndices(std::chrono::seconds lock_timeout)
{
if (indices_loaded)
return;
@ -444,7 +443,7 @@ void StorageStripeLog::loadIndices(std::chrono::seconds lock_timeout) const
}
void StorageStripeLog::loadIndices(const WriteLock & /* already locked exclusively */) const
void StorageStripeLog::loadIndices(const WriteLock & lock /* already locked exclusively */)
{
if (indices_loaded)
return;
@ -457,6 +456,9 @@ void StorageStripeLog::loadIndices(const WriteLock & /* already locked exclusive
indices_loaded = true;
num_indices_saved = indices.blocks.size();
/// We need indices to calculate the number of rows, and now we have the indices.
updateTotalRows(lock);
}
@ -493,6 +495,35 @@ void StorageStripeLog::saveFileSizes(const WriteLock & /* already locked for wri
file_checker.update(data_file_path);
file_checker.update(index_file_path);
file_checker.save();
total_bytes = file_checker.getTotalSize();
}
void StorageStripeLog::updateTotalRows(const WriteLock &)
{
if (!indices_loaded)
return;
size_t new_total_rows = 0;
for (const auto & block : indices.blocks)
new_total_rows += block.num_rows;
total_rows = new_total_rows;
}
std::optional<UInt64> StorageStripeLog::totalRows(const Settings &) const
{
if (indices_loaded)
return total_rows;
if (!total_bytes)
return 0;
return {};
}
std::optional<UInt64> StorageStripeLog::totalBytes(const Settings &) const
{
return total_bytes;
}
@ -623,6 +654,7 @@ public:
/// Finish writing.
storage->saveIndices(lock);
storage->saveFileSizes(lock);
storage->updateTotalRows(lock);
return {};
}
catch (...)
@ -652,31 +684,6 @@ RestoreTaskPtr StorageStripeLog::restoreData(ContextMutablePtr context, const AS
}
std::optional<UInt64> StorageStripeLog::totalRows(const Settings & settings) const
{
auto lock_timeout = getLockTimeout(settings);
loadIndices(lock_timeout);
ReadLock lock{rwlock, lock_timeout};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
size_t total_rows = 0;
for (const auto & block : indices.blocks)
total_rows += block.num_rows;
return total_rows;
}
std::optional<UInt64> StorageStripeLog::totalBytes(const Settings & settings) const
{
ReadLock lock{rwlock, getLockTimeout(settings)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
return file_checker.getTotalSize();
}
void registerStorageStripeLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{

View File

@ -52,13 +52,13 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder&) override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
bool hasDataToBackup() const override { return true; }
BackupEntries backupData(ContextPtr context, const ASTs & partitions) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
protected:
StorageStripeLog(
DiskPtr disk_,
@ -76,8 +76,8 @@ private:
/// Reads the index file if it hasn't read yet.
/// It is done lazily, so that with a large number of tables, the server starts quickly.
void loadIndices(std::chrono::seconds lock_timeout) const;
void loadIndices(const WriteLock &) const;
void loadIndices(std::chrono::seconds lock_timeout);
void loadIndices(const WriteLock &);
/// Saves the index file.
void saveIndices(const WriteLock &);
@ -88,6 +88,9 @@ private:
/// Saves the sizes of the data and index files.
void saveFileSizes(const WriteLock &);
/// Recalculates the number of rows stored in this table.
void updateTotalRows(const WriteLock &);
const DiskPtr disk;
String table_path;
@ -95,9 +98,12 @@ private:
String index_file_path;
FileChecker file_checker;
mutable IndexForNativeFormat indices;
mutable std::atomic<bool> indices_loaded = false;
mutable size_t num_indices_saved = 0;
IndexForNativeFormat indices;
std::atomic<bool> indices_loaded = false;
size_t num_indices_saved = 0;
std::atomic<UInt64> total_rows = 0;
std::atomic<UInt64> total_bytes = 0;
const size_t max_compress_block_size;

View File

@ -25,8 +25,14 @@
│ Counter │ 0 │ 1 │ 1 │ 0 │
└─────────┴─────────────────────┴───────────────────┴───────────────────┴────────────────────┘
Check total_bytes/total_rows for TinyLog
\N \N
\N \N
0 0
27 \N
Check total_bytes/total_rows for Log
0 0
43 1
Check total_bytes/total_rows for StripeLog
0 0
113 1
Check total_bytes/total_rows for Memory
0 0
64 1

View File

@ -73,6 +73,20 @@ INSERT INTO check_system_tables VALUES (1);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
DROP TABLE check_system_tables;
SELECT 'Check total_bytes/total_rows for Log';
CREATE TABLE check_system_tables (key UInt8) ENGINE = Log();
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
INSERT INTO check_system_tables VALUES (1);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
DROP TABLE check_system_tables;
SELECT 'Check total_bytes/total_rows for StripeLog';
CREATE TABLE check_system_tables (key UInt8) ENGINE = StripeLog();
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
INSERT INTO check_system_tables VALUES (1);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();
DROP TABLE check_system_tables;
SELECT 'Check total_bytes/total_rows for Memory';
CREATE TABLE check_system_tables (key UInt16) ENGINE = Memory();
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables' AND database = currentDatabase();