Source code for zvec_db.rerankers.fusion.rrf

from __future__ import annotations

"""Reciprocal Rank Fusion (RRF) rerankers for combining multiple ranked lists.

This module provides RRF-based rerankers that fuse results from multiple
sources by computing scores based on document ranks rather than raw scores.
This makes RRF robust to score scale differences between sources.

Classes
-------
RrfReranker
    RRF implementation with optional source weighting.

Example Usage
-------------
::

    from zvec_db.rerankers import RrfReranker

    # Basic RRF (equal weights for all sources)
    reranker = RrfReranker(topn=10, rank_constant=60)
    results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs})

    # Weighted RRF (favor dense over sparse)
    reranker = RrfReranker(
        topn=10,
        rank_constant=60,
        weights={"dense": 0.7, "bm25": 0.3}
    )
    results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs})
"""

import heapq
import logging
import warnings
from typing import TYPE_CHECKING, Optional, Union

from zvec.model.doc import Doc
from zvec.typing import MetricType

from ..base import FusionRerankerBase

if TYPE_CHECKING:
    from zvec.model.schema import CollectionSchema

logger = logging.getLogger(__name__)


# RRF rank constant: default value from the original RRF paper.
# Higher values (e.g., 100) reduce the impact of early ranks,
# lower values (e.g., 20) give more weight to top-ranked documents.
# The value 60 is a commonly used default that balances both extremes.
DEFAULT_RANK_CONSTANT = 60


[docs] class RrfReranker(FusionRerankerBase): r"""Reciprocal Rank Fusion (RRF) reranker with optional source weighting. RRF combines results from multiple ranked lists by computing a fused score based on the reciprocal of each document's rank: .. math:: \text{RRF}(d) = \sum_{r \in R} w_r \times \frac{1}{k + \text{rank}(d, r)} where: - :math:`k` is the ``rank_constant`` (default: 60) - :math:`w_r` is the weight for source :math:`r` (default: 1.0) By default, all sources have equal weight. Use the ``weights`` parameter to favor certain sources over others. Args: topn (int, optional): Number of top documents to return. Defaults to 10. rerank_field (Optional[str], optional): Ignored by RRF. Defaults to None. rank_constant (int, optional): Smoothing constant :math:`k` in the RRF formula. Larger values reduce the impact of early ranks. Defaults to 60. weights (Optional[dict[str, float]], optional): Weight per source. Sources not listed use weight 1.0. Defaults to None (equal weights). normalize (Optional[Union[bool, str, dict]], optional): **Ignored for RRF**. RRF uses ranks, not scores, so normalization has no effect. Setting this parameter will emit a warning. Defaults to None. Example: >>> # Basic RRF with default parameters >>> reranker = RrfReranker(topn=10) >>> results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs}) >>> # Weighted RRF: favor dense embeddings (70%) over BM25 (30%) >>> reranker = RrfReranker( ... topn=10, ... weights={"dense": 0.7, "bm25": 0.3} ... ) >>> results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs}) >>> # Custom rank constant (higher = more uniform ranking) >>> reranker = RrfReranker(topn=10, rank_constant=100) >>> results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs}) Note: RRF uses only document **ranks**, not raw scores. This makes it robust to score scale differences between sources (e.g., BM25 scores vs. cosine similarities). Normalization is not applicable to RRF. See Also: WeightedReranker: For weighted fusion based on scores rather than ranks. """
[docs] def __init__( self, topn: int = 10, rerank_field: Optional[str] = None, rank_constant: int = DEFAULT_RANK_CONSTANT, weights: Optional[dict[str, float]] = None, normalize: Optional[Union[bool, str, dict]] = None, metrics: Optional[ Union[MetricType, dict[str, Union[str, MetricType, None]]] ] = None, schema: Optional["CollectionSchema"] = None, ): super().__init__( topn=topn, rerank_field=rerank_field, schema=schema, metrics=metrics ) self._rank_constant = rank_constant self._weights = weights or {} # Warn if normalize is set - RRF doesn't use scores, only ranks if normalize is not None and normalize is not False: warnings.warn( "The 'normalize' parameter has no effect on RrfReranker. " "RRF uses document ranks, not raw scores, so normalization is not applicable. " "Use WeightedReranker if you need score-based normalization.", UserWarning, stacklevel=2, )
@property def rank_constant(self) -> int: return self._rank_constant @property def weights(self) -> dict[str, float]: return self._weights @property def normalize(self) -> Optional[Union[bool, str, dict]]: # normalize is not used by RRF, but exposed for API consistency return None def _rrf_score(self, rank: int, weight: float = 1.0) -> float: """Compute weighted RRF score for a given rank. Args: rank (int): Document rank (0-indexed). weight (float, optional): Source weight. Defaults to 1.0. Returns: float: Weighted RRF score. """ return weight / (self._rank_constant + rank + 1)
[docs] def rerank( self, query_results: dict[str, list[Doc]], query: Optional[str] = None ) -> list[Doc]: """Apply Reciprocal Rank Fusion to combine multiple query results. Args: query_results (dict[str, list[Doc]]): Results from one or more vector queries. Keys are source names (e.g., "bm25", "dense"), values are ranked document lists. query (Optional[str], optional): Ignored. Defaults to None. Returns: list[Doc]: Reranked documents with RRF scores in the ``score`` field, sorted by descending score. Example: >>> reranker = RrfReranker(topn=5) >>> results = reranker.rerank({ ... "bm25": bm25_results, ... "dense": dense_results ... }) >>> print(f"Top document: {results[0].id} (score: {results[0].score:.4f})") """ del query # RRF only uses rank positions, not the query text rrf_scores: dict[str, float] = {} id_to_doc: dict[str, Doc] = {} for source_name, query_result in query_results.items(): if not query_result: continue # Get weight for this source (default: 1.0) weight = self._weights.get(source_name, 1.0) # Convert scores and sort documents by converted score (descending) # This ensures rank 0 = best document (higher=better) converted_docs = [] for doc in query_result: raw_score = float(doc.score) if doc.score else 0.0 converted_score = self._convert_metric(raw_score, source_name) converted_docs.append((doc, converted_score)) # Sort by converted score descending (best first) converted_docs.sort(key=lambda x: x[1], reverse=True) # Compute RRF scores based on rank positions for rank, (doc, _) in enumerate(converted_docs): doc_id = doc.id rrf_score = self._rrf_score(rank, weight) rrf_scores[doc_id] = rrf_scores.get(doc_id, 0.0) + rrf_score if doc_id not in id_to_doc: id_to_doc[doc_id] = doc top_docs = heapq.nlargest(self.topn, rrf_scores.items(), key=lambda x: x[1]) results: list[Doc] = [] for doc_id, rrf_score in top_docs: doc = id_to_doc[doc_id] results.append(doc._replace(score=rrf_score)) return results