from __future__ import annotations
"""Reciprocal Rank Fusion (RRF) rerankers for combining multiple ranked lists.
This module provides RRF-based rerankers that fuse results from multiple
sources by computing scores based on document ranks rather than raw scores.
This makes RRF robust to score scale differences between sources.
Classes
-------
RrfReranker
RRF implementation with optional source weighting.
Example Usage
-------------
::
from zvec_db.rerankers import RrfReranker
# Basic RRF (equal weights for all sources)
reranker = RrfReranker(topn=10, rank_constant=60)
results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs})
# Weighted RRF (favor dense over sparse)
reranker = RrfReranker(
topn=10,
rank_constant=60,
weights={"dense": 0.7, "bm25": 0.3}
)
results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs})
"""
import heapq
import logging
import warnings
from typing import TYPE_CHECKING, Optional, Union
from zvec.model.doc import Doc
from zvec.typing import MetricType
from ..base import FusionRerankerBase
if TYPE_CHECKING:
from zvec.model.schema import CollectionSchema
logger = logging.getLogger(__name__)
# RRF rank constant: default value from the original RRF paper.
# Higher values (e.g., 100) reduce the impact of early ranks,
# lower values (e.g., 20) give more weight to top-ranked documents.
# The value 60 is a commonly used default that balances both extremes.
DEFAULT_RANK_CONSTANT = 60
[docs]
class RrfReranker(FusionRerankerBase):
r"""Reciprocal Rank Fusion (RRF) reranker with optional source weighting.
RRF combines results from multiple ranked lists by computing a fused score
based on the reciprocal of each document's rank:
.. math::
\text{RRF}(d) = \sum_{r \in R} w_r \times \frac{1}{k + \text{rank}(d, r)}
where:
- :math:`k` is the ``rank_constant`` (default: 60)
- :math:`w_r` is the weight for source :math:`r` (default: 1.0)
By default, all sources have equal weight. Use the ``weights`` parameter
to favor certain sources over others.
Args:
topn (int, optional): Number of top documents to return. Defaults to 10.
rerank_field (Optional[str], optional): Ignored by RRF. Defaults to None.
rank_constant (int, optional): Smoothing constant :math:`k` in the RRF
formula. Larger values reduce the impact of early ranks. Defaults to 60.
weights (Optional[dict[str, float]], optional): Weight per source.
Sources not listed use weight 1.0. Defaults to None (equal weights).
normalize (Optional[Union[bool, str, dict]], optional): **Ignored for RRF**.
RRF uses ranks, not scores, so normalization has no effect. Setting this
parameter will emit a warning. Defaults to None.
Example:
>>> # Basic RRF with default parameters
>>> reranker = RrfReranker(topn=10)
>>> results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs})
>>> # Weighted RRF: favor dense embeddings (70%) over BM25 (30%)
>>> reranker = RrfReranker(
... topn=10,
... weights={"dense": 0.7, "bm25": 0.3}
... )
>>> results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs})
>>> # Custom rank constant (higher = more uniform ranking)
>>> reranker = RrfReranker(topn=10, rank_constant=100)
>>> results = reranker.rerank({"bm25": bm25_docs, "dense": dense_docs})
Note:
RRF uses only document **ranks**, not raw scores. This makes it robust
to score scale differences between sources (e.g., BM25 scores vs.
cosine similarities). Normalization is not applicable to RRF.
See Also:
WeightedReranker: For weighted fusion based on scores rather than ranks.
"""
[docs]
def __init__(
self,
topn: int = 10,
rerank_field: Optional[str] = None,
rank_constant: int = DEFAULT_RANK_CONSTANT,
weights: Optional[dict[str, float]] = None,
normalize: Optional[Union[bool, str, dict]] = None,
metrics: Optional[
Union[MetricType, dict[str, Union[str, MetricType, None]]]
] = None,
schema: Optional["CollectionSchema"] = None,
):
super().__init__(
topn=topn, rerank_field=rerank_field, schema=schema, metrics=metrics
)
self._rank_constant = rank_constant
self._weights = weights or {}
# Warn if normalize is set - RRF doesn't use scores, only ranks
if normalize is not None and normalize is not False:
warnings.warn(
"The 'normalize' parameter has no effect on RrfReranker. "
"RRF uses document ranks, not raw scores, so normalization is not applicable. "
"Use WeightedReranker if you need score-based normalization.",
UserWarning,
stacklevel=2,
)
@property
def rank_constant(self) -> int:
return self._rank_constant
@property
def weights(self) -> dict[str, float]:
return self._weights
@property
def normalize(self) -> Optional[Union[bool, str, dict]]:
# normalize is not used by RRF, but exposed for API consistency
return None
def _rrf_score(self, rank: int, weight: float = 1.0) -> float:
"""Compute weighted RRF score for a given rank.
Args:
rank (int): Document rank (0-indexed).
weight (float, optional): Source weight. Defaults to 1.0.
Returns:
float: Weighted RRF score.
"""
return weight / (self._rank_constant + rank + 1)
[docs]
def rerank(
self, query_results: dict[str, list[Doc]], query: Optional[str] = None
) -> list[Doc]:
"""Apply Reciprocal Rank Fusion to combine multiple query results.
Args:
query_results (dict[str, list[Doc]]): Results from one or more vector queries.
Keys are source names (e.g., "bm25", "dense"), values are ranked
document lists.
query (Optional[str], optional): Ignored. Defaults to None.
Returns:
list[Doc]: Reranked documents with RRF scores in the ``score`` field,
sorted by descending score.
Example:
>>> reranker = RrfReranker(topn=5)
>>> results = reranker.rerank({
... "bm25": bm25_results,
... "dense": dense_results
... })
>>> print(f"Top document: {results[0].id} (score: {results[0].score:.4f})")
"""
del query # RRF only uses rank positions, not the query text
rrf_scores: dict[str, float] = {}
id_to_doc: dict[str, Doc] = {}
for source_name, query_result in query_results.items():
if not query_result:
continue
# Get weight for this source (default: 1.0)
weight = self._weights.get(source_name, 1.0)
# Convert scores and sort documents by converted score (descending)
# This ensures rank 0 = best document (higher=better)
converted_docs = []
for doc in query_result:
raw_score = float(doc.score) if doc.score else 0.0
converted_score = self._convert_metric(raw_score, source_name)
converted_docs.append((doc, converted_score))
# Sort by converted score descending (best first)
converted_docs.sort(key=lambda x: x[1], reverse=True)
# Compute RRF scores based on rank positions
for rank, (doc, _) in enumerate(converted_docs):
doc_id = doc.id
rrf_score = self._rrf_score(rank, weight)
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0.0) + rrf_score
if doc_id not in id_to_doc:
id_to_doc[doc_id] = doc
top_docs = heapq.nlargest(self.topn, rrf_scores.items(), key=lambda x: x[1])
results: list[Doc] = []
for doc_id, rrf_score in top_docs:
doc = id_to_doc[doc_id]
results.append(doc._replace(score=rrf_score))
return results