diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index bbee34e4a0b..673c774ce5a 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -90,7 +90,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo LOG_TRACE(log, "Sending part " << part_name); try - { + { auto storage_lock = data.lockStructureForShare( false, RWLockImpl::NO_QUERY, data.getSettings()->lock_acquire_timeout_for_background_operations); @@ -292,19 +292,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( new_data_part->is_temp = true; new_data_part->setColumns(block.getNamesAndTypesList()); new_data_part->minmax_idx.update(block, data.minmax_idx_columns); - - auto partition_block = block; - data.partition_key_expr->execute(partition_block); - auto & partition = new_data_part->partition.value; - size_t partition_columns_num = data.partition_key_sample.columns(); - partition.resize(partition_columns_num); - - for (size_t i = 0; i < partition_columns_num; ++i) - { - const auto & column_name = data.partition_key_sample.getByPosition(i).name; - const auto & partition_column = partition_block.getByName(column_name).column; - partition[i] = (*partition_column)[0]; - } + new_data_part->partition.create(data, block, 0); MergedBlockOutputStream part_out(new_data_part, block.getNamesAndTypesList(), {}, nullptr); part_out.writePrefix(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index b1fb2554c76..8943a9fcb1f 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -354,6 +354,7 @@ private: using MergeTreeDataPartState = IMergeTreeDataPart::State; using MergeTreeDataPartPtr = std::shared_ptr; +using MergeTreeMutableDataPartPtr = std::shared_ptr; bool isCompactPart(const MergeTreeDataPartPtr & data_part); bool isWidePart(const MergeTreeDataPartPtr & data_part); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9fd80765f0e..67a30934e2c 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -248,8 +248,8 @@ MergeTreeData::MergeTreeData( if (settings->in_memory_parts_enable_wal) { - auto disk = reserveSpace(0)->getDisk(); - write_ahead_log = std::make_shared(*this, disk); + auto disk = makeEmptyReservationOnLargestDisk()->getDisk(); + write_ahead_log = std::make_shared(*this, std::move(disk)); } } @@ -859,6 +859,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) const auto settings = getSettings(); std::vector> part_names_with_disks; + MutableDataPartsVector parts_from_wal; Strings part_file_names; auto disks = getStoragePolicy()->getDisks(); @@ -899,19 +900,23 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) part_names_with_disks.emplace_back(it->name(), disk_ptr); if (startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) - loadDataPartsFromWAL(disk_ptr, it->name()); + { + MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name()); + auto current_parts = wal.restore(); + for (auto & part : current_parts) + parts_from_wal.push_back(std::move(part)); + } } } auto part_lock = lockParts(); - // TODO: fix. - // data_parts_indexes.clear(); + data_parts_indexes.clear(); - // if (part_names_with_disks.empty()) - // { - // LOG_DEBUG(log, "There is no data parts"); - // return; - // } + if (part_names_with_disks.empty() && parts_from_wal.empty()) + { + LOG_DEBUG(log, "There is no data parts"); + return; + } /// Parallel loading of data parts. size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_names_with_disks.size()); @@ -1043,6 +1048,16 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) pool.wait(); + for (auto & part : parts_from_wal) + { + part->modification_time = time(nullptr); + /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later + part->state = DataPartState::Committed; + + if (!data_parts_indexes.insert(part).second) + throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); + } + if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR); @@ -1110,21 +1125,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) LOG_DEBUG(log, "Loaded data parts (" << data_parts_indexes.size() << " items)"); } -void MergeTreeData::loadDataPartsFromWAL(const DiskPtr & disk, const String & file_name) -{ - MergeTreeWriteAheadLog wal(*this, disk, file_name); - auto parts = wal.restore(); - for (auto & part : parts) - { - part->modification_time = time(nullptr); - /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later - part->state = DataPartState::Committed; - - if (!data_parts_indexes.insert(part).second) - throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); - } -} - /// Is the part directory old. /// True if its modification time and the modification time of all files inside it is less then threshold. diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 618668b0d87..4ae1a4bb0cb 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -370,7 +370,6 @@ public: /// Load the set of data parts from disk. Call once - immediately after the object is created. void loadDataParts(bool skip_sanity_checks); - void loadDataPartsFromWAL(const DiskPtr & disk, const String & file_name); String getLogName() const { return log_name; } diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 000d0abad43..3124b16a138 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -152,4 +152,18 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr checksums.files["partition.dat"].file_hash = out_hashing.getHash(); } +void MergeTreePartition::create(const MergeTreeData & storage, Block block, size_t row) +{ + storage.partition_key_expr->execute(block); + size_t partition_columns_num = storage.partition_key_sample.columns(); + value.resize(partition_columns_num); + + for (size_t i = 0; i < partition_columns_num; ++i) + { + const auto & column_name = storage.partition_key_sample.getByPosition(i).name; + const auto & partition_column = block.getByName(column_name).column; + partition_column->get(row, value[i]); + } +} + } diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index 2a589339ba8..d91022f655f 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -36,6 +36,8 @@ public: void store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const; void assign(const MergeTreePartition & other) { value.assign(other.value); } + + void create(const MergeTreeData & storage, Block block, size_t row); }; } diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 20010eb8f4c..68e240f9d7e 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -33,7 +33,7 @@ struct MergeTreeSettings : public SettingsCollection M(SettingUInt64, min_rows_for_wide_part, 0, "Minimal number of rows to create part in wide format instead of compact", 0) \ M(SettingUInt64, min_bytes_for_compact_part, 0, "Minimal uncompressed size in bytes to create part in compact format instead of saving it in RAM", 0) \ M(SettingUInt64, min_rows_for_compact_part, 0, "Minimal number of rows to create part in compact format instead of saving it in RAM", 0) \ - M(SettingBool, in_memory_parts_enable_wal, 0, "", 0) \ + M(SettingBool, in_memory_parts_enable_wal, 1, "Whether to write blocks in Native format to write-ahead-log before creation in-memory part", 0) \ M(SettingBool, in_memory_parts_insert_sync, 0, "", 0) \ \ /** Merge settings. */ \ diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 84091f904e6..e5c0c370ae2 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -12,15 +12,6 @@ namespace ErrorCodes extern const int UNKNOWN_FORMAT_VERSION; } -// WALBlockOutputStream::WALBlockOutputStream(WriteBuffer & out_, const Block & header_) -// : NativeBlockOutputStream(out_, 0, header_), out(out_) {} - -// void WALBlockOutputStream::write(const Block & block, const String & part_name) -// { -// writeIntBinary(0, out); -// writeString(part_name, out); -// NativeBlockOutputStream::write(block); -// } MergeTreeWriteAheadLog::MergeTreeWriteAheadLog( const MergeTreeData & storage_, @@ -28,7 +19,7 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog( const String & name) : storage(storage_) , disk(disk_) - , path(storage.getFullPathOnDisk(disk) + name) + , path(storage.getRelativeDataPath() + name) { init(); } @@ -93,8 +84,9 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore() part_name); auto block = block_in.read(); - part->minmax_idx.update(block, storage.minmax_idx_columns); + part->partition.create(storage, block, 0); + MergedBlockOutputStream part_out(part, block.getNamesAndTypesList(), {}, nullptr); part_out.writePrefix(); part_out.write(block); diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h index 50bb9aa5e13..7a0e5759624 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h @@ -8,20 +8,6 @@ namespace DB { -// class WALBlockOutputStream : public NativeBlockOutputStream -// { -// public: -// WALBlockOutputStream(WriteBuffer & out_, const Block & header_); -// void write(const Block & block, const String & part_name); - -// private: -// WriteBuffer & out; -// }; - -// class WALBlockInputStream : public NativeBlockInputStream -// { -// }; - class MergeTreeData; class MergeTreeWriteAheadLog @@ -29,13 +15,13 @@ class MergeTreeWriteAheadLog public: constexpr static auto WAL_FILE_NAME = "wal"; constexpr static auto WAL_FILE_EXTENSION = ".bin"; - constexpr static size_t MAX_WAL_BYTES = 1024; + constexpr static size_t MAX_WAL_BYTES = 1024 * 1024 * 1024; MergeTreeWriteAheadLog(const MergeTreeData & storage_, const DiskPtr & disk_, const String & name = String(WAL_FILE_NAME) + WAL_FILE_EXTENSION); void write(const Block & block, const String & part_name); - std::vector> restore(); + std::vector restore(); private: void init(); diff --git a/tests/integration/test_polymorphic_parts/test.py b/tests/integration/test_polymorphic_parts/test.py index 1cd917a12bb..8cca8aa1072 100644 --- a/tests/integration/test_polymorphic_parts/test.py +++ b/tests/integration/test_polymorphic_parts/test.py @@ -39,7 +39,8 @@ def create_tables(name, nodes, node_settings, shard): PARTITION BY toYYYYMM(date) ORDER BY id SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes}, - min_rows_for_wide_part = {min_rows_for_wide_part}, min_rows_for_compact_part = {min_rows_for_compact_part} + min_rows_for_wide_part = {min_rows_for_wide_part}, min_rows_for_compact_part = {min_rows_for_compact_part}, + in_memory_parts_enable_wal = 1 '''.format(name=name, shard=shard, repl=i, **settings)) def create_tables_old_format(name, nodes, shard): @@ -68,8 +69,8 @@ node6 = cluster.add_instance('node6', config_dir='configs', main_configs=['confi settings_in_memory = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 256} -node9 = cluster.add_instance('node9', config_dir="configs", with_zookeeper=True) -node10 = cluster.add_instance('node10', config_dir="configs", with_zookeeper=True) +node9 = cluster.add_instance('node9', config_dir="configs", with_zookeeper=True, stay_alive=True) +node10 = cluster.add_instance('node10', config_dir="configs", with_zookeeper=True, stay_alive=True) @pytest.fixture(scope="module") def start_cluster(): @@ -83,6 +84,7 @@ def start_cluster(): create_tables('polymorphic_table_wide', [node3, node4], [settings_wide, settings_compact], "shard2") create_tables_old_format('polymorphic_table', [node5, node6], "shard3") create_tables('in_memory_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4") + create_tables('wal_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4") yield cluster @@ -314,6 +316,39 @@ def test_in_memory(start_cluster): assert TSV(node10.query("SELECT part_type, count() FROM system.parts " \ "WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(expected) +def test_in_memory_wal(start_cluster): + node9.query("SYSTEM STOP MERGES") + node10.query("SYSTEM STOP MERGES") + + for i in range(5): + insert_random_data('wal_table', node9, 50) + node10.query("SYSTEM SYNC REPLICA wal_table", timeout=20) + + assert node9.query("SELECT count() FROM wal_table") == "250\n" + assert node10.query("SELECT count() FROM wal_table") == "250\n" + + assert node9.query("SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == '5\n' + assert node10.query("SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == '5\n' + + # WAL works at inserts + node9.restart_clickhouse(kill=True) + time.sleep(5) + assert node9.query("SELECT count() FROM wal_table") == "250\n" + + # WAL works at fetches + node10.restart_clickhouse(kill=True) + time.sleep(5) + assert node10.query("SELECT count() FROM wal_table") == "250\n" + + node9.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0") + insert_random_data('wal_table', node9, 50) + assert node9.query("SELECT count() FROM wal_table") == "300\n" + + # Data is lost without WAL + node9.restart_clickhouse(kill=True) + time.sleep(5) + assert node9.query("SELECT count() FROM wal_table") == "250\n" + def test_polymorphic_parts_index(start_cluster): node1.query(''' CREATE TABLE index_compact(a UInt32, s String)