in-memory parts: better restore from wal

This commit is contained in:
Anton Popov 2020-05-05 18:06:16 +03:00
parent 14e8592e47
commit 4878c91d07
10 changed files with 88 additions and 71 deletions

View File

@ -90,7 +90,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
LOG_TRACE(log, "Sending part " << part_name);
try
{
{
auto storage_lock = data.lockStructureForShare(
false, RWLockImpl::NO_QUERY, data.getSettings()->lock_acquire_timeout_for_background_operations);
@ -292,19 +292,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->minmax_idx.update(block, data.minmax_idx_columns);
auto partition_block = block;
data.partition_key_expr->execute(partition_block);
auto & partition = new_data_part->partition.value;
size_t partition_columns_num = data.partition_key_sample.columns();
partition.resize(partition_columns_num);
for (size_t i = 0; i < partition_columns_num; ++i)
{
const auto & column_name = data.partition_key_sample.getByPosition(i).name;
const auto & partition_column = partition_block.getByName(column_name).column;
partition[i] = (*partition_column)[0];
}
new_data_part->partition.create(data, block, 0);
MergedBlockOutputStream part_out(new_data_part, block.getNamesAndTypesList(), {}, nullptr);
part_out.writePrefix();

View File

@ -354,6 +354,7 @@ private:
using MergeTreeDataPartState = IMergeTreeDataPart::State;
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
bool isCompactPart(const MergeTreeDataPartPtr & data_part);
bool isWidePart(const MergeTreeDataPartPtr & data_part);

View File

@ -248,8 +248,8 @@ MergeTreeData::MergeTreeData(
if (settings->in_memory_parts_enable_wal)
{
auto disk = reserveSpace(0)->getDisk();
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk);
auto disk = makeEmptyReservationOnLargestDisk()->getDisk();
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, std::move(disk));
}
}
@ -859,6 +859,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
const auto settings = getSettings();
std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
MutableDataPartsVector parts_from_wal;
Strings part_file_names;
auto disks = getStoragePolicy()->getDisks();
@ -899,19 +900,23 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
part_names_with_disks.emplace_back(it->name(), disk_ptr);
if (startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
loadDataPartsFromWAL(disk_ptr, it->name());
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
auto current_parts = wal.restore();
for (auto & part : current_parts)
parts_from_wal.push_back(std::move(part));
}
}
}
auto part_lock = lockParts();
// TODO: fix.
// data_parts_indexes.clear();
data_parts_indexes.clear();
// if (part_names_with_disks.empty())
// {
// LOG_DEBUG(log, "There is no data parts");
// return;
// }
if (part_names_with_disks.empty() && parts_from_wal.empty())
{
LOG_DEBUG(log, "There is no data parts");
return;
}
/// Parallel loading of data parts.
size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_names_with_disks.size());
@ -1043,6 +1048,16 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
pool.wait();
for (auto & part : parts_from_wal)
{
part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
}
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR);
@ -1110,21 +1125,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
LOG_DEBUG(log, "Loaded data parts (" << data_parts_indexes.size() << " items)");
}
void MergeTreeData::loadDataPartsFromWAL(const DiskPtr & disk, const String & file_name)
{
MergeTreeWriteAheadLog wal(*this, disk, file_name);
auto parts = wal.restore();
for (auto & part : parts)
{
part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
}
}
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.

View File

@ -370,7 +370,6 @@ public:
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
void loadDataPartsFromWAL(const DiskPtr & disk, const String & file_name);
String getLogName() const { return log_name; }

View File

@ -152,4 +152,18 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
}
void MergeTreePartition::create(const MergeTreeData & storage, Block block, size_t row)
{
storage.partition_key_expr->execute(block);
size_t partition_columns_num = storage.partition_key_sample.columns();
value.resize(partition_columns_num);
for (size_t i = 0; i < partition_columns_num; ++i)
{
const auto & column_name = storage.partition_key_sample.getByPosition(i).name;
const auto & partition_column = block.getByName(column_name).column;
partition_column->get(row, value[i]);
}
}
}

View File

@ -36,6 +36,8 @@ public:
void store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
void assign(const MergeTreePartition & other) { value.assign(other.value); }
void create(const MergeTreeData & storage, Block block, size_t row);
};
}

View File

@ -33,7 +33,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, min_rows_for_wide_part, 0, "Minimal number of rows to create part in wide format instead of compact", 0) \
M(SettingUInt64, min_bytes_for_compact_part, 0, "Minimal uncompressed size in bytes to create part in compact format instead of saving it in RAM", 0) \
M(SettingUInt64, min_rows_for_compact_part, 0, "Minimal number of rows to create part in compact format instead of saving it in RAM", 0) \
M(SettingBool, in_memory_parts_enable_wal, 0, "", 0) \
M(SettingBool, in_memory_parts_enable_wal, 1, "Whether to write blocks in Native format to write-ahead-log before creation in-memory part", 0) \
M(SettingBool, in_memory_parts_insert_sync, 0, "", 0) \
\
/** Merge settings. */ \

View File

@ -12,15 +12,6 @@ namespace ErrorCodes
extern const int UNKNOWN_FORMAT_VERSION;
}
// WALBlockOutputStream::WALBlockOutputStream(WriteBuffer & out_, const Block & header_)
// : NativeBlockOutputStream(out_, 0, header_), out(out_) {}
// void WALBlockOutputStream::write(const Block & block, const String & part_name)
// {
// writeIntBinary(0, out);
// writeString(part_name, out);
// NativeBlockOutputStream::write(block);
// }
MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
const MergeTreeData & storage_,
@ -28,7 +19,7 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
const String & name)
: storage(storage_)
, disk(disk_)
, path(storage.getFullPathOnDisk(disk) + name)
, path(storage.getRelativeDataPath() + name)
{
init();
}
@ -93,8 +84,9 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
part_name);
auto block = block_in.read();
part->minmax_idx.update(block, storage.minmax_idx_columns);
part->partition.create(storage, block, 0);
MergedBlockOutputStream part_out(part, block.getNamesAndTypesList(), {}, nullptr);
part_out.writePrefix();
part_out.write(block);

View File

@ -8,20 +8,6 @@
namespace DB
{
// class WALBlockOutputStream : public NativeBlockOutputStream
// {
// public:
// WALBlockOutputStream(WriteBuffer & out_, const Block & header_);
// void write(const Block & block, const String & part_name);
// private:
// WriteBuffer & out;
// };
// class WALBlockInputStream : public NativeBlockInputStream
// {
// };
class MergeTreeData;
class MergeTreeWriteAheadLog
@ -29,13 +15,13 @@ class MergeTreeWriteAheadLog
public:
constexpr static auto WAL_FILE_NAME = "wal";
constexpr static auto WAL_FILE_EXTENSION = ".bin";
constexpr static size_t MAX_WAL_BYTES = 1024;
constexpr static size_t MAX_WAL_BYTES = 1024 * 1024 * 1024;
MergeTreeWriteAheadLog(const MergeTreeData & storage_, const DiskPtr & disk_,
const String & name = String(WAL_FILE_NAME) + WAL_FILE_EXTENSION);
void write(const Block & block, const String & part_name);
std::vector<std::shared_ptr<IMergeTreeDataPart>> restore();
std::vector<MergeTreeMutableDataPartPtr> restore();
private:
void init();

View File

@ -39,7 +39,8 @@ def create_tables(name, nodes, node_settings, shard):
PARTITION BY toYYYYMM(date)
ORDER BY id
SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes},
min_rows_for_wide_part = {min_rows_for_wide_part}, min_rows_for_compact_part = {min_rows_for_compact_part}
min_rows_for_wide_part = {min_rows_for_wide_part}, min_rows_for_compact_part = {min_rows_for_compact_part},
in_memory_parts_enable_wal = 1
'''.format(name=name, shard=shard, repl=i, **settings))
def create_tables_old_format(name, nodes, shard):
@ -68,8 +69,8 @@ node6 = cluster.add_instance('node6', config_dir='configs', main_configs=['confi
settings_in_memory = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 256}
node9 = cluster.add_instance('node9', config_dir="configs", with_zookeeper=True)
node10 = cluster.add_instance('node10', config_dir="configs", with_zookeeper=True)
node9 = cluster.add_instance('node9', config_dir="configs", with_zookeeper=True, stay_alive=True)
node10 = cluster.add_instance('node10', config_dir="configs", with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
@ -83,6 +84,7 @@ def start_cluster():
create_tables('polymorphic_table_wide', [node3, node4], [settings_wide, settings_compact], "shard2")
create_tables_old_format('polymorphic_table', [node5, node6], "shard3")
create_tables('in_memory_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4")
create_tables('wal_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4")
yield cluster
@ -314,6 +316,39 @@ def test_in_memory(start_cluster):
assert TSV(node10.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(expected)
def test_in_memory_wal(start_cluster):
node9.query("SYSTEM STOP MERGES")
node10.query("SYSTEM STOP MERGES")
for i in range(5):
insert_random_data('wal_table', node9, 50)
node10.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
assert node9.query("SELECT count() FROM wal_table") == "250\n"
assert node10.query("SELECT count() FROM wal_table") == "250\n"
assert node9.query("SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == '5\n'
assert node10.query("SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == '5\n'
# WAL works at inserts
node9.restart_clickhouse(kill=True)
time.sleep(5)
assert node9.query("SELECT count() FROM wal_table") == "250\n"
# WAL works at fetches
node10.restart_clickhouse(kill=True)
time.sleep(5)
assert node10.query("SELECT count() FROM wal_table") == "250\n"
node9.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0")
insert_random_data('wal_table', node9, 50)
assert node9.query("SELECT count() FROM wal_table") == "300\n"
# Data is lost without WAL
node9.restart_clickhouse(kill=True)
time.sleep(5)
assert node9.query("SELECT count() FROM wal_table") == "250\n"
def test_polymorphic_parts_index(start_cluster):
node1.query('''
CREATE TABLE index_compact(a UInt32, s String)