Fix tests

This commit is contained in:
Maksim Kita 2021-03-05 17:12:50 +03:00
parent 64e8f09649
commit 10c5518988
3 changed files with 94 additions and 70 deletions

View File

@ -177,11 +177,7 @@ static inline void insertDefaultValuesIntoColumns(
const auto & default_value_provider = fetch_request.defaultValueProviderAtIndex(column_index);
if (fetch_request.shouldFillResultColumnWithIndex(column_index))
{
std::cerr << "insertDefaultValuesIntoColumns" << default_value_provider.getDefaultValue(row_index).dump() << std::endl;
column->insert(default_value_provider.getDefaultValue(row_index));
}
}
}

View File

@ -879,7 +879,7 @@ public:
throw Exception("Method insertColumnsForKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
void insertDefaultKeys(const PaddedPODArray<StringRef> & keys) override
void insertDefaultKeys(const PaddedPODArray<StringRef> & keys) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
insertDefaultKeysImpl(keys);
@ -970,67 +970,68 @@ private:
const auto * it = index.find(key);
if (it)
if (!it)
{
const auto & cell = it->getMapped();
++result.not_found_keys_size;
continue;
}
bool has_deadline = cellHasDeadline(cell);
const auto & cell = it->getMapped();
if (has_deadline && now > cell.deadline + strict_max_lifetime_seconds)
bool has_deadline = cellHasDeadline(cell);
if (has_deadline && now > cell.deadline + strict_max_lifetime_seconds)
{
++result.not_found_keys_size;
continue;
}
bool cell_is_expired = false;
KeyState::State key_state = KeyState::found;
if (has_deadline && now > cell.deadline)
{
cell_is_expired = true;
key_state = KeyState::expired;
}
result.expired_keys_size += cell_is_expired;
result.found_keys_size += !cell_is_expired;
switch (cell.state)
{
case Cell::in_memory:
{
++result.not_found_keys_size;
continue;
result.key_index_to_state[key_index] = {key_state, fetched_columns_index};
++fetched_columns_index;
const auto & partition = memory_buffer_partitions[cell.in_memory_partition_index];
char * serialized_columns_place = partition.getPlace(cell.index);
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, serialized_columns_place);
break;
}
bool cell_is_expired = false;
KeyState::State key_state = KeyState::found;
if (has_deadline && now > cell.deadline)
case Cell::on_disk:
{
cell_is_expired = true;
key_state = KeyState::expired;
block_to_keys_map[cell.index.block_index].emplace_back(key_index, cell.index.offset_in_block, cell_is_expired);
if (!unique_blocks_to_request.contains(cell.index.block_index))
{
blocks_to_request.emplace_back(cell.index.block_index);
unique_blocks_to_request.insert(cell.index.block_index);
}
break;
}
result.expired_keys_size += cell_is_expired;
result.found_keys_size += !cell_is_expired;
switch (cell.state)
case Cell::default_value:
{
case Cell::in_memory:
{
result.key_index_to_state[key_index] = {key_state, fetched_columns_index};
++fetched_columns_index;
result.key_index_to_state[key_index] = {key_state, fetched_columns_index};
result.key_index_to_state[key_index].setDefault();
++fetched_columns_index;
++result.default_keys_size;
const auto & partition = memory_buffer_partitions[cell.in_memory_partition_index];
char * serialized_columns_place = partition.getPlace(cell.index);
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, serialized_columns_place);
break;
}
case Cell::on_disk:
{
block_to_keys_map[cell.index.block_index].emplace_back(key_index, cell.index.offset_in_block, cell_is_expired);
if (!unique_blocks_to_request.contains(cell.index.block_index))
{
blocks_to_request.emplace_back(cell.index.block_index);
unique_blocks_to_request.insert(cell.index.block_index);
}
break;
}
case Cell::default_value:
{
result.key_index_to_state[key_index] = {key_state, fetched_columns_index};
result.key_index_to_state[key_index].setDefault();
++fetched_columns_index;
++result.default_keys_size;
insertDefaultValuesIntoColumns(result.fetched_columns, fetch_request, key_index);
break;
}
insertDefaultValuesIntoColumns(result.fetched_columns, fetch_request, key_index);
break;
}
}
else
++result.not_found_keys_size;
}
/// Sort blocks by offset before start async io requests
@ -1119,9 +1120,10 @@ private:
Cell cell;
setCellDeadline(cell, now);
cell.index = {0, 0};
cell.in_memory_partition_index = 0;
cell.state = Cell::default_value;
cell.index = {0, 0};
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
@ -1157,9 +1159,20 @@ private:
void insertCell(SSDCacheKeyType & ssd_cache_key, Cell & cell)
{
SSDCacheIndex cache_index;
/** InsertCell has following flow
size_t loop_count = 0;
1. We try to write key into current memory buffer, if write succeeded then return.
2. Then if we does not write key into current memory buffer, we try to flush current memory buffer
to disk.
If flush succeeded then reset current memory buffer, write key into it and return.
If flush failed that means that current partition on disk is full, need to allocate new partition
or start reusing old ones.
Retry to step 1.
*/
SSDCacheIndex cache_index {0, 0};
while (true)
{
@ -1203,7 +1216,8 @@ private:
/// Check if key in index is key from old partition blocks
if (old_key_cell.isOnDisk() &&
old_key_block >= block_index_in_file_before_write && old_key_block <= file_read_end_block_index)
old_key_block >= block_index_in_file_before_write &&
old_key_block < file_read_end_block_index)
index.erase(old_key);
}
}
@ -1230,8 +1244,12 @@ private:
auto key_to_update = keys_to_update[i];
auto * it = index.find(key_to_update);
/// If lru cache does not contain old keys or there were duplicated keys in memory buffer partition
if (!it || updated_keys.contains(it->getKey()))
/// If there are not key to update or key to update not in memory
if (!it || it->getMapped().state != Cell::in_memory)
continue;
/// If there were duplicated keys in memory buffer partition
if (updated_keys.contains(it->getKey()))
continue;
updated_keys.insert(key_to_update);
@ -1264,9 +1282,7 @@ private:
{
/// Try tro create next partition without reusing old partitions
++current_partition_index;
file_buffer.allocateSizeForNextPartition();
memory_buffer_partitions.emplace_back(configuration.block_size, configuration.write_buffer_blocks_size);
}
else
@ -1276,8 +1292,6 @@ private:
file_buffer.reset();
}
}
++loop_count;
}
}
}

View File

@ -389,6 +389,13 @@ public:
if (dictionary_key_type == DictionaryKeyType::simple)
{
if (!WhichDataType(key_col_with_type.type).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Third argument of function ({}) must be uint64 when dictionary is simple. Actual type ({}).",
getName(),
key_col_with_type.type->getName());
if (attribute_names.size() > 1)
{
const auto & result_tuple_type = assert_cast<const DataTypeTuple &>(*result_type);
@ -413,11 +420,11 @@ public:
else if (dictionary_key_type == DictionaryKeyType::complex)
{
if (!isTuple(key_col_with_type.type))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Third argument of function ({}) must be tuple when dictionary is complex. Actual type ({}).",
getName(),
key_col_with_type.type->getName());
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Third argument of function ({}) must be tuple when dictionary is complex. Actual type ({}).",
getName(),
key_col_with_type.type->getName());
/// Functions in external dictionaries_loader only support full-value (not constant) columns with keys.
ColumnPtr key_column_full = key_col_with_type.column->convertToFullColumnIfConst();
@ -448,6 +455,13 @@ public:
}
else if (dictionary_key_type == DictionaryKeyType::range)
{
if (!WhichDataType(key_col_with_type.type).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Third argument of function ({}) must be uint64 when dictionary is range. Actual type ({}).",
getName(),
key_col_with_type.type->getName());
if (attribute_names.size() > 1)
{
const auto & result_tuple_type = assert_cast<const DataTypeTuple &>(*result_type);