Cleanup code and tests

This commit is contained in:
Larry Luo 2022-07-18 20:07:57 -07:00
parent 6429013b27
commit 657ce7c614
6 changed files with 28 additions and 30 deletions

View File

@ -113,7 +113,7 @@ public:
void moveFile(const String & from_name, const String & to_name) override; void moveFile(const String & from_name, const String & to_name) override;
void replaceFile(const String & from_name, const String & to_name) override; void replaceFile(const String & from_name, const String & to_name) override;
std::unique_ptr<WriteBufferFromFileBase> writeFile( std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path, const String & name,
size_t buf_size, size_t buf_size,
DB::WriteMode mode, DB::WriteMode mode,
const WriteSettings & settings) override; const WriteSettings & settings) override;

View File

@ -29,7 +29,7 @@ bool GinIndexPostingsBuilder::contains(UInt32 row_id) const
if (useRoaring()) if (useRoaring())
return bmp.contains(row_id); return bmp.contains(row_id);
auto it(std::find(lst.begin(), lst.begin()+lst_length, row_id)); const auto it(std::find(lst.begin(), lst.begin()+lst_length, row_id));
return it != lst.begin()+lst_length; return it != lst.begin()+lst_length;
} }
@ -116,8 +116,7 @@ GinIndexPostingsListPtr GinIndexPostingsBuilder::deserialize(ReadBuffer &buffer)
buffer.readStrict(reinterpret_cast<char*>(buf.get()), size); buffer.readStrict(reinterpret_cast<char*>(buf.get()), size);
GinIndexPostingsListPtr postings_list = std::shared_ptr<GinIndexPostingsList> GinIndexPostingsListPtr postings_list = std::make_shared<GinIndexPostingsList>(GinIndexPostingsList::read(buf.get()));
(new GinIndexPostingsList(GinIndexPostingsList::read(buf.get())));
return postings_list; return postings_list;
} }
@ -207,7 +206,7 @@ bool GinIndexStore::needToWrite() const
void GinIndexStore::finalize() void GinIndexStore::finalize()
{ {
if (current_postings.size() > 0) if (!current_postings.empty())
{ {
writeSegment(); writeSegment();
} }
@ -266,8 +265,8 @@ void GinIndexStore::writeSegment()
current_index = 0; current_index = 0;
for (const auto& [token, postings_list] : token_postings_list_pairs) for (const auto& [token, postings_list] : token_postings_list_pairs)
{ {
String strToken{token}; String str_token{token};
builder.add(strToken, offset); builder.add(str_token, offset);
offset += encoding_lengths[current_index++]; offset += encoding_lengths[current_index++];
} }
@ -315,7 +314,7 @@ void GinIndexStoreReader::readSegments()
{ {
init_file_streams(); init_file_streams();
} }
segment_file_stream->read(reinterpret_cast<char*>(&segments[0]), segment_num * sizeof(GinIndexSegment)); segment_file_stream->read(reinterpret_cast<char*>(segments.data()), segment_num * sizeof(GinIndexSegment));
for (size_t i = 0; i < segment_num; ++i) for (size_t i = 0; i < segment_num; ++i)
{ {
auto seg_id = segments[i].segment_id; auto seg_id = segments[i].segment_id;
@ -393,9 +392,9 @@ GinIndexStoreFactory& GinIndexStoreFactory::instance()
GinIndexStorePtr GinIndexStoreFactory::get(const String& name, DataPartStoragePtr storage_) GinIndexStorePtr GinIndexStoreFactory::get(const String& name, DataPartStoragePtr storage_)
{ {
const String& part_path_ = storage_->getRelativePath(); const String& part_path = storage_->getRelativePath();
std::lock_guard lock(stores_mutex); std::lock_guard lock(stores_mutex);
String key = name + String(":")+part_path_; String key = name + String(":")+part_path;
GinIndexStores::const_iterator it = stores.find(key); GinIndexStores::const_iterator it = stores.find(key);

View File

@ -1708,6 +1708,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
{ {
if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin) if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
granule = reader.read(); granule = reader.read();
const auto * gin_filter_condition = dynamic_cast<const MergeTreeConditionGinFilter *>(&*condition);
// Cast to Ann condition // Cast to Ann condition
auto ann_condition = std::dynamic_pointer_cast<ApproximateNearestNeighbour::IMergeTreeIndexConditionAnn>(condition); auto ann_condition = std::dynamic_pointer_cast<ApproximateNearestNeighbour::IMergeTreeIndexConditionAnn>(condition);
if (ann_condition != nullptr) if (ann_condition != nullptr)
@ -1733,7 +1734,6 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
} }
continue; continue;
} }
auto gin_filter_condition = dynamic_cast<const MergeTreeConditionGinFilter *>(&*condition);
bool result{false}; bool result{false};
if (!gin_filter_condition) if (!gin_filter_condition)

View File

@ -56,9 +56,9 @@ void MergeTreeIndexGranuleGinFilter::serializeBinary(WriteBuffer & ostr) const
for (const auto & gin_filter : gin_filters) for (const auto & gin_filter : gin_filters)
{ {
size_t filterSize = gin_filter.getFilter().size(); size_t filter_size = gin_filter.getFilter().size();
size_serialization->serializeBinary(filterSize, ostr); size_serialization->serializeBinary(filter_size, ostr);
ostr.write(reinterpret_cast<const char*>(&gin_filter.getFilter()[0]), filterSize * sizeof(RowIDRange)); ostr.write(reinterpret_cast<const char*>(gin_filter.getFilter().data()), filter_size * sizeof(RowIDRange));
} }
} }
@ -79,7 +79,7 @@ void MergeTreeIndexGranuleGinFilter::deserializeBinary(ReadBuffer & istr, MergeT
continue; continue;
gin_filter.getFilter().assign(filterSize, {}); gin_filter.getFilter().assign(filterSize, {});
istr.read(reinterpret_cast<char*>(&gin_filter.getFilter()[0]), filterSize * sizeof(RowIDRange)); istr.read(reinterpret_cast<char*>(gin_filter.getFilter().data()), filterSize * sizeof(RowIDRange));
} }
has_elems = true; has_elems = true;
} }
@ -132,8 +132,8 @@ void MergeTreeIndexAggregatorGinFilter::update(const Block & block, size_t * pos
+ toString(*pos) + ", Block rows: " + toString(block.rows()) + ".", ErrorCodes::LOGICAL_ERROR); + toString(*pos) + ", Block rows: " + toString(block.rows()) + ".", ErrorCodes::LOGICAL_ERROR);
size_t rows_read = std::min(limit, block.rows() - *pos); size_t rows_read = std::min(limit, block.rows() - *pos);
auto rowID = store->getNextRowIDRange(rows_read); auto row_id = store->getNextRowIDRange(rows_read);
auto startRowID = rowID; auto start_row_id = row_id;
for (size_t col = 0; col < index_columns.size(); ++col) for (size_t col = 0; col < index_columns.size(); ++col)
{ {
@ -156,11 +156,11 @@ void MergeTreeIndexAggregatorGinFilter::update(const Block & block, size_t * pos
for (size_t row_num = 0; row_num < elements_size; ++row_num) for (size_t row_num = 0; row_num < elements_size; ++row_num)
{ {
auto ref = column_key.getDataAt(element_start_row + row_num); auto ref = column_key.getDataAt(element_start_row + row_num);
addToGinFilter(rowID, ref.data, ref.size, granule->gin_filters[col]); addToGinFilter(row_id, ref.data, ref.size, granule->gin_filters[col]);
store->addSize(ref.size); store->addSize(ref.size);
} }
current_position += 1; current_position += 1;
rowID++; row_id++;
if (store->needToWrite()) if (store->needToWrite())
need_to_write = true; need_to_write = true;
@ -171,14 +171,14 @@ void MergeTreeIndexAggregatorGinFilter::update(const Block & block, size_t * pos
for (size_t i = 0; i < rows_read; ++i) for (size_t i = 0; i < rows_read; ++i)
{ {
auto ref = column->getDataAt(current_position + i); auto ref = column->getDataAt(current_position + i);
addToGinFilter(rowID, ref.data, ref.size, granule->gin_filters[col]); addToGinFilter(row_id, ref.data, ref.size, granule->gin_filters[col]);
store->addSize(ref.size); store->addSize(ref.size);
rowID++; row_id++;
if (store->needToWrite()) if (store->needToWrite())
need_to_write = true; need_to_write = true;
} }
} }
granule->gin_filters[col].addRowRangeToGinFilter(store->getCurrentSegmentID(), startRowID, startRowID + rows_read - 1); granule->gin_filters[col].addRowRangeToGinFilter(store->getCurrentSegmentID(), start_row_id, start_row_id + rows_read - 1);
if (need_to_write) if (need_to_write)
{ {
store->writeSegment(); store->writeSegment();
@ -257,7 +257,7 @@ bool MergeTreeConditionGinFilter::alwaysUnknownOrTrue() const
return rpn_stack[0]; return rpn_stack[0];
} }
bool MergeTreeConditionGinFilter::mayBeTrueOnGranuleInPart(MergeTreeIndexGranulePtr idx_granule,[[maybe_unused]] PostingsCacheForStore &cache_in_store) const bool MergeTreeConditionGinFilter::mayBeTrueOnGranuleInPart(MergeTreeIndexGranulePtr idx_granule,[[maybe_unused]] PostingsCacheForStore &cache_store) const
{ {
std::shared_ptr<MergeTreeIndexGranuleGinFilter> granule std::shared_ptr<MergeTreeIndexGranuleGinFilter> granule
= std::dynamic_pointer_cast<MergeTreeIndexGranuleGinFilter>(idx_granule); = std::dynamic_pointer_cast<MergeTreeIndexGranuleGinFilter>(idx_granule);
@ -277,7 +277,7 @@ bool MergeTreeConditionGinFilter::mayBeTrueOnGranuleInPart(MergeTreeIndexGranule
|| element.function == RPNElement::FUNCTION_NOT_EQUALS || element.function == RPNElement::FUNCTION_NOT_EQUALS
|| element.function == RPNElement::FUNCTION_HAS) || element.function == RPNElement::FUNCTION_HAS)
{ {
rpn_stack.emplace_back(granule->gin_filters[element.key_column].contains(*element.gin_filter, cache_in_store), true); rpn_stack.emplace_back(granule->gin_filters[element.key_column].contains(*element.gin_filter, cache_store), true);
if (element.function == RPNElement::FUNCTION_NOT_EQUALS) if (element.function == RPNElement::FUNCTION_NOT_EQUALS)
rpn_stack.back() = !rpn_stack.back(); rpn_stack.back() = !rpn_stack.back();
@ -293,7 +293,7 @@ bool MergeTreeConditionGinFilter::mayBeTrueOnGranuleInPart(MergeTreeIndexGranule
const auto & gin_filters = element.set_gin_filters[column]; const auto & gin_filters = element.set_gin_filters[column];
for (size_t row = 0; row < gin_filters.size(); ++row) for (size_t row = 0; row < gin_filters.size(); ++row)
result[row] = result[row] && granule->gin_filters[key_idx].contains(gin_filters[row], cache_in_store); result[row] = result[row] && granule->gin_filters[key_idx].contains(gin_filters[row], cache_store);
} }
rpn_stack.emplace_back( rpn_stack.emplace_back(
@ -308,7 +308,7 @@ bool MergeTreeConditionGinFilter::mayBeTrueOnGranuleInPart(MergeTreeIndexGranule
const auto & gin_filters = element.set_gin_filters[0]; const auto & gin_filters = element.set_gin_filters[0];
for (size_t row = 0; row < gin_filters.size(); ++row) for (size_t row = 0; row < gin_filters.size(); ++row)
result[row] = result[row] && granule->gin_filters[element.key_column].contains(gin_filters[row], cache_in_store); result[row] = result[row] && granule->gin_filters[element.key_column].contains(gin_filters[row], cache_store);
rpn_stack.emplace_back( rpn_stack.emplace_back(
std::find(std::cbegin(result), std::cend(result), true) != std::end(result), true); std::find(std::cbegin(result), std::cend(result), true) != std::end(result), true);
@ -732,7 +732,7 @@ MergeTreeIndexPtr ginIndexCreator(
{ {
if (index.type == GinFilter::getName()) if (index.type == GinFilter::getName())
{ {
size_t n = index.arguments.size() == 0 ? 0 : index.arguments[0].get<size_t>(); size_t n = index.arguments.empty() ? 0 : index.arguments[0].get<size_t>();
GinFilterParameters params(n); GinFilterParameters params(n);
/// Use SplitTokenExtractor when n is 0, otherwise use NgramTokenExtractor /// Use SplitTokenExtractor when n is 0, otherwise use NgramTokenExtractor
@ -783,7 +783,7 @@ void ginIndexValidator(const IndexDescription & index, bool /*attach*/)
if (index.arguments.size() == 1 and index.arguments[0].getType() != Field::Types::UInt64) if (index.arguments.size() == 1 and index.arguments[0].getType() != Field::Types::UInt64)
throw Exception("Gin index argument must be positive integer.", ErrorCodes::INCORRECT_QUERY); throw Exception("Gin index argument must be positive integer.", ErrorCodes::INCORRECT_QUERY);
size_t ngrams = index.arguments.size() == 0 ? 0 : index.arguments[0].get<size_t>(); size_t ngrams = index.arguments.empty() ? 0 : index.arguments[0].get<size_t>();
/// Just validate /// Just validate
GinFilterParameters params(ngrams); GinFilterParameters params(ngrams);

View File

@ -171,7 +171,7 @@ public:
MergeTreeIndexGranulePtr createIndexGranule() const override; MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override; MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
MergeTreeIndexAggregatorPtr createIndexAggregatorForPart(const GinIndexStorePtr &partIndexInfo) const override; MergeTreeIndexAggregatorPtr createIndexAggregatorForPart(const GinIndexStorePtr &store) const override;
MergeTreeIndexConditionPtr createIndexCondition( MergeTreeIndexConditionPtr createIndexCondition(
const SelectQueryInfo & query, ContextPtr context) const override; const SelectQueryInfo & query, ContextPtr context) const override;

View File

@ -1,6 +1,5 @@
SET log_queries = 1; SET log_queries = 1;
SET max_threads = 1; SET max_threads = 1;
TRUNCATE system.query_log;
-- create table for gin(2) -- create table for gin(2)
DROP TABLE IF EXISTS simple1; DROP TABLE IF EXISTS simple1;