Merge pull request #56103 from ClickHouse/revert-55995-buf

Revert "Fix 'Block structure mismatch' on concurrent ALTER and INSERTs in Buffer table"
This commit is contained in:
Alexey Milovidov 2023-10-29 02:34:14 +01:00 committed by GitHub
commit f201b7bc79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 8 additions and 67 deletions

View File

@ -165,8 +165,7 @@ public:
: ISource(storage_snapshot->getSampleBlockForColumns(column_names_))
, column_names_and_types(storage_snapshot->getColumnsByNames(
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns(), column_names_))
, buffer(buffer_)
, metadata_version(storage_snapshot->metadata->metadata_version) {}
, buffer(buffer_) {}
String getName() const override { return "Buffer"; }
@ -181,7 +180,7 @@ protected:
std::unique_lock lock(buffer.lockForReading());
if (!buffer.data.rows() || buffer.metadata_version != metadata_version)
if (!buffer.data.rows())
return res;
Columns columns;
@ -199,7 +198,6 @@ protected:
private:
NamesAndTypesList column_names_and_types;
StorageBuffer::Buffer & buffer;
int32_t metadata_version;
bool has_been_read = false;
};
@ -617,7 +615,7 @@ public:
least_busy_buffer = &storage.buffers[start_shard_num];
least_busy_lock = least_busy_buffer->lockForWriting();
}
insertIntoBuffer(block, *least_busy_buffer, metadata_snapshot->metadata_version);
insertIntoBuffer(block, *least_busy_buffer);
least_busy_lock.unlock();
storage.reschedule();
@ -626,15 +624,14 @@ private:
StorageBuffer & storage;
StorageMetadataPtr metadata_snapshot;
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, int32_t metadata_version)
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
{
time_t current_time = time(nullptr);
/// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
Block sorted_block = block.sortColumns();
if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()) ||
buffer.metadata_version != metadata_version)
if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
@ -642,7 +639,6 @@ private:
*/
storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */);
buffer.metadata_version = metadata_version;
}
if (!buffer.first_write_time)
@ -1066,12 +1062,13 @@ void StorageBuffer::alter(const AlterCommands & params, ContextPtr local_context
checkAlterIsPossible(params, local_context);
auto metadata_snapshot = getInMemoryMetadataPtr();
/// Flush buffers to the storage because BufferSource skips buffers with old metadata_version.
/// Flush all buffers to storages, so that no non-empty blocks of the old
/// structure remain. Structure of empty blocks will be updated during first
/// insert.
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, false /*cleanup*/, local_context);
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
params.apply(new_metadata, local_context);
new_metadata.metadata_version += 1;
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);
}

View File

@ -128,18 +128,6 @@ private:
time_t first_write_time = 0;
Block data;
/// Schema version, checked to avoid mixing blocks with different sets of columns, from
/// before and after an ALTER. There are some remaining mild problems if an ALTER happens
/// in the middle of a long-running INSERT:
/// * The data produced by the INSERT after the ALTER is not visible to SELECTs until flushed.
/// That's because BufferSource skips buffers with old metadata_version instead of converting
/// them to the latest schema, for simplicity.
/// * If there are concurrent INSERTs, some of which started before the ALTER and some started
/// after, then the buffer's metadata_version will oscillate back and forth between the two
/// schemas, flushing the buffer each time. This is probably fine because long-running INSERTs
/// usually don't produce lots of small blocks.
int32_t metadata_version = 0;
std::unique_lock<std::mutex> lockForReading() const;
std::unique_lock<std::mutex> lockForWriting() const;
std::unique_lock<std::mutex> tryLock() const;

View File

@ -1,10 +0,0 @@
0
1
2 bobr
0
1
1
1
1
1
2 bobr

View File

@ -1,34 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists 02900_buffer"
$CLICKHOUSE_CLIENT -q "drop table if exists 02900_destination"
$CLICKHOUSE_CLIENT -q "create table 02900_destination (k Int8, v String) engine Memory"
$CLICKHOUSE_CLIENT -q "create table 02900_buffer (k Int8) engine Buffer(currentDatabase(), '02900_destination', 1, 1000, 1000, 10000, 10000, 1000000, 1000000)"
$CLICKHOUSE_CLIENT -q "insert into 02900_buffer (k) select 0"
# Start a long-running INSERT that uses the old schema.
$CLICKHOUSE_CLIENT -q "insert into 02900_buffer (k) select sleepEachRow(1)+1 from numbers(5) settings max_block_size=1, max_insert_block_size=1, min_insert_block_size_rows=0, min_insert_block_size_bytes=0" &
sleep 1
$CLICKHOUSE_CLIENT -q "alter table 02900_buffer add column v String"
$CLICKHOUSE_CLIENT -q "insert into 02900_buffer (k, v) select 2, 'bobr'"
wait
# The data produced by the long-running INSERT after the ALTER is not visible until flushed.
$CLICKHOUSE_CLIENT -q "select k, any(v) from 02900_buffer group by k order by k"
$CLICKHOUSE_CLIENT -q "optimize table 02900_buffer"
$CLICKHOUSE_CLIENT -q "select * from 02900_buffer order by k"
$CLICKHOUSE_CLIENT -q "drop table 02900_buffer"
$CLICKHOUSE_CLIENT -q "drop table 02900_destination"