Source code for churro_ocr.providers.page_detection

"""Built-in page detection backends."""

from __future__ import annotations

import asyncio
from dataclasses import dataclass
from io import BytesIO
from typing import TYPE_CHECKING, cast

from churro_ocr._internal.install import install_command_hint
from churro_ocr._internal.litellm import LiteLLMTransport
from churro_ocr._internal.logging import logger
from churro_ocr._internal.retry import retry_api_call
from churro_ocr._internal.runtime import run_sync
from churro_ocr.page_detection import PageCandidate, PageDetectionBackend
from churro_ocr.prompts import DEFAULT_BOUNDARY_DETECTION_PROMPT
from churro_ocr.prompts.layout import (
    build_boundary_review_prompt,
    build_text_block_boundary_review_prompt,
    build_text_block_localization_prompt,
)
from churro_ocr.providers import _page_detection_helpers as _helpers
from churro_ocr.providers import _page_detection_review as _review
from churro_ocr.providers.specs import LiteLLMTransportConfig

if TYPE_CHECKING:
    from collections.abc import Awaitable, Callable

    from PIL import Image

    from churro_ocr.types import BoundingBox

_AzureAnalyzeResultLike = _helpers._AzureAnalyzeResultLike
_bbox_from_polygon = _helpers._bbox_from_polygon
_bbox_to_polygon = _helpers._bbox_to_polygon
_BoxReviewDecision = _helpers._BoxReviewDecision
_boxes_equal = _helpers._boxes_equal
_build_box_review_preview = _helpers._build_box_review_preview
_build_edge_strip_review_preview = _helpers._build_edge_strip_review_preview
_configuration_error = _helpers._configuration_error
_convert_source_box_to_review_crop_box = _helpers._convert_source_box_to_review_crop_box
_EDGE_NAMES = _helpers._EDGE_NAMES
_EdgeReviewDecision = _helpers._EdgeReviewDecision
_full_image_candidate = _helpers._full_image_candidate
_map_review_crop_box_to_source_box = _helpers._map_review_crop_box_to_source_box
_merge_instruction_prompts = _helpers._merge_instruction_prompts
_normalize_azure_page_polygon = _helpers._normalize_azure_page_polygon
_PageBox = _helpers._PageBox
_PAGE_DETECTION_BOX_WIDTH = _helpers._PAGE_DETECTION_BOX_WIDTH
_PageDetectionTransform = _helpers._PageDetectionTransform
_parse_page_boxes_json = _helpers._parse_page_boxes_json
_parse_single_edge_review_decision_json = _helpers._parse_single_edge_review_decision_json
_parse_text_block_box_json = _helpers._parse_text_block_box_json
_parse_text_block_edge_review_decision_json = _helpers._parse_text_block_edge_review_decision_json
_prepare_detection_image = _helpers._prepare_detection_image
_provider_error = _helpers._provider_error
_strip_code_fence = _helpers._strip_code_fence
_TEXT_BLOCK_DETECTION_BOX_WIDTH = _helpers._TEXT_BLOCK_DETECTION_BOX_WIDTH
_TEXT_BLOCK_REVIEW_CROP_MARGIN_FRACTION = _helpers._TEXT_BLOCK_REVIEW_CROP_MARGIN_FRACTION
_type_error = _helpers._type_error
_value_error = _helpers._value_error
_apply_box_review_decision = _review._apply_box_review_decision
_apply_edge_decision_to_coordinate = _review._apply_edge_decision_to_coordinate
_apply_page_review_stop_condition = _review._apply_page_review_stop_condition
_convert_strip_delta_to_local_delta = _review._convert_strip_delta_to_local_delta
_is_oscillating_magnitude = _review._is_oscillating_magnitude
_log_box_history = _review._log_box_history
_new_page_review_stop_state = _review._new_page_review_stop_state
_no_change_edge_review_decision = _review._no_change_edge_review_decision
_page_review_is_fully_frozen = _review._page_review_is_fully_frozen
_select_more_expansive_oscillation_coordinate = _review._select_more_expansive_oscillation_coordinate
_strip_axis_size_pixels = _review._strip_axis_size_pixels
LiteLLMTransportLike = LiteLLMTransportConfig | LiteLLMTransport | None


async def _complete_page_boxes(
    *,
    model: str,
    image: Image.Image,
    system_prompt: str,
    user_prompt: str | None,
    transport: LiteLLMTransport,
) -> list[_PageBox]:
    messages = transport.prepare_messages(
        system_prompt=None,
        user_prompt=_merge_instruction_prompts(system_prompt, user_prompt),
        images=[image],
    )
    output = await transport.complete_text(
        model=model,
        messages=messages,
        output_json=True,
    )
    logger.info("Initial LLM page-detection response: %s", output)
    return _parse_page_boxes_json(output)


async def _complete_text_block_box(
    *,
    model: str,
    image: Image.Image,
    block_tag: str,
    block_text: str,
    transport: LiteLLMTransport,
) -> _PageBox | None:
    messages = transport.prepare_messages(
        system_prompt=None,
        user_prompt=build_text_block_localization_prompt(
            block_tag=block_tag,
            block_text=block_text,
        ),
        images=[image],
    )
    output = await transport.complete_text(
        model=model,
        messages=messages,
        output_json=True,
    )
    logger.info("Initial LLM text-block-localization response: %s", output)
    return _parse_text_block_box_json(output)


async def _review_single_edge_from_strip(
    *,
    model: str,
    review_image: Image.Image,
    strip_image: Image.Image,
    strip_bounds: tuple[int, int, int, int],
    edge_name: str,
    page_index: int,
    history_steps: int,
    round_index: int,
    transport: LiteLLMTransport,
) -> _EdgeReviewDecision:
    strip_axis_pixels = _strip_axis_size_pixels(strip_bounds, edge_name=edge_name)
    if strip_axis_pixels <= 0:
        message = f"Invalid strip axis size for edge '{edge_name}'."
        raise _value_error(message)

    prompt = build_boundary_review_prompt(
        edge_name=edge_name,
        page_index=page_index,
        strip_axis=(
            "horizontal (x-axis / strip width)"
            if edge_name in {"left", "right"}
            else "vertical (y-axis / strip height)"
        ),
    )
    messages = transport.prepare_messages(
        system_prompt=None,
        user_prompt=prompt,
        images=[strip_image],
    )
    output = await transport.complete_text(
        model=model,
        messages=messages,
        output_json=True,
    )
    logger.info(
        "Review LLM single-edge response (round=%s, page=%s, edge=%s, history rounds=%s): %s",
        round_index,
        page_index,
        edge_name,
        history_steps,
        output,
    )
    response_page_index, response_edge_name, strip_decision = _parse_single_edge_review_decision_json(output)
    if response_page_index != page_index:
        logger.info(
            "Single-edge review page_index mismatch (expected=%s, got=%s) for edge=%s; using expected.",
            page_index,
            response_page_index,
            edge_name,
        )
    if response_edge_name != edge_name:
        logger.info(
            "Single-edge review edge mismatch (expected=%s, got=%s); using expected edge.",
            edge_name,
            response_edge_name,
        )

    local_axis_pixels = review_image.width if edge_name in {"left", "right"} else review_image.height
    local_amount = _convert_strip_delta_to_local_delta(
        strip_decision.amount,
        strip_axis_pixels=strip_axis_pixels,
        local_axis_pixels=local_axis_pixels,
    )
    return _EdgeReviewDecision(action=strip_decision.action, amount=local_amount)


async def _review_single_text_block_edge_from_strip(
    *,
    model: str,
    review_image: Image.Image,
    strip_image: Image.Image,
    strip_bounds: tuple[int, int, int, int],
    edge_name: str,
    block_tag: str,
    block_text: str,
    history_steps: int,
    round_index: int,
    transport: LiteLLMTransport,
) -> _EdgeReviewDecision:
    strip_axis_pixels = _strip_axis_size_pixels(strip_bounds, edge_name=edge_name)
    if strip_axis_pixels <= 0:
        message = f"Invalid strip axis size for edge '{edge_name}'."
        raise _value_error(message)

    prompt = build_text_block_boundary_review_prompt(
        edge_name=edge_name,
        block_tag=block_tag,
        block_text=block_text,
        strip_axis=(
            "horizontal (x-axis / strip width)"
            if edge_name in {"left", "right"}
            else "vertical (y-axis / strip height)"
        ),
    )
    messages = transport.prepare_messages(
        system_prompt=None,
        user_prompt=prompt,
        images=[strip_image],
    )
    output = await transport.complete_text(
        model=model,
        messages=messages,
        output_json=True,
    )
    logger.info(
        "Text-block review LLM single-edge response (round=%s, edge=%s, history rounds=%s): %s",
        round_index,
        edge_name,
        history_steps,
        output,
    )
    response_edge_name, strip_decision = _parse_text_block_edge_review_decision_json(output)
    if response_edge_name != edge_name:
        logger.info(
            "Text-block edge-review mismatch (expected=%s, got=%s); using expected edge.",
            edge_name,
            response_edge_name,
        )

    local_axis_pixels = review_image.width if edge_name in {"left", "right"} else review_image.height
    local_amount = _convert_strip_delta_to_local_delta(
        strip_decision.amount,
        strip_axis_pixels=strip_axis_pixels,
        local_axis_pixels=local_axis_pixels,
    )
    return _EdgeReviewDecision(action=strip_decision.action, amount=local_amount)


async def _review_page_box(
    *,
    image: Image.Image,
    current_box: _PageBox,
    history_steps: int,
    round_index: int,
    model: str,
    transport: LiteLLMTransport,
) -> _PageBox:
    review_image, crop_bounds = _build_box_review_preview(image, current_box)
    local_box = _convert_source_box_to_review_crop_box(current_box, crop_bounds, image.size)
    edge_strip_inputs = [
        (edge_name, *_build_edge_strip_review_preview(review_image, local_box, edge_name))
        for edge_name in _EDGE_NAMES
    ]
    edge_results = await asyncio.gather(
        *[
            _review_single_edge_from_strip(
                model=model,
                review_image=review_image,
                strip_image=strip_image,
                strip_bounds=strip_bounds,
                edge_name=edge_name,
                page_index=current_box.page_index,
                history_steps=history_steps,
                round_index=round_index,
                transport=transport,
            )
            for edge_name, strip_image, strip_bounds in edge_strip_inputs
        ],
        return_exceptions=True,
    )

    edge_decisions: dict[str, _EdgeReviewDecision] = {}
    for edge_name, result in zip(_EDGE_NAMES, edge_results, strict=False):
        if isinstance(result, BaseException):
            logger.info(
                "Edge-strip review failed for round %s, page %s, edge %s; using no_change: %s",
                round_index,
                current_box.page_index,
                edge_name,
                result,
            )
            edge_decisions[edge_name] = _no_change_edge_review_decision()
            continue
        edge_decisions[edge_name] = result

    reviewed_local_box = _apply_box_review_decision(
        local_box,
        _BoxReviewDecision(
            page_index=current_box.page_index,
            left=edge_decisions["left"],
            top=edge_decisions["top"],
            right=edge_decisions["right"],
            bottom=edge_decisions["bottom"],
        ),
        expected_page_index=current_box.page_index,
    )
    return _map_review_crop_box_to_source_box(
        reviewed_local_box,
        crop_bounds,
        image.size,
        page_index=current_box.page_index,
    )


async def _review_text_block_box(
    *,
    image: Image.Image,
    current_box: _PageBox,
    block_tag: str,
    block_text: str,
    history_steps: int,
    round_index: int,
    model: str,
    transport: LiteLLMTransport,
) -> _PageBox:
    review_image, crop_bounds = _build_box_review_preview(
        image,
        current_box,
        margin_fraction=_TEXT_BLOCK_REVIEW_CROP_MARGIN_FRACTION,
        outline_width=_TEXT_BLOCK_DETECTION_BOX_WIDTH,
    )
    local_box = _convert_source_box_to_review_crop_box(current_box, crop_bounds, image.size)
    edge_strip_inputs = [
        (
            edge_name,
            *_build_edge_strip_review_preview(
                review_image,
                local_box,
                edge_name,
                outline_width=_TEXT_BLOCK_DETECTION_BOX_WIDTH,
            ),
        )
        for edge_name in _EDGE_NAMES
    ]
    edge_results = await asyncio.gather(
        *[
            _review_single_text_block_edge_from_strip(
                model=model,
                review_image=review_image,
                strip_image=strip_image,
                strip_bounds=strip_bounds,
                edge_name=edge_name,
                block_tag=block_tag,
                block_text=block_text,
                history_steps=history_steps,
                round_index=round_index,
                transport=transport,
            )
            for edge_name, strip_image, strip_bounds in edge_strip_inputs
        ],
        return_exceptions=True,
    )

    edge_decisions: dict[str, _EdgeReviewDecision] = {}
    for edge_name, result in zip(_EDGE_NAMES, edge_results, strict=False):
        if isinstance(result, BaseException):
            logger.info(
                "Text-block edge-strip review failed for round %s, edge %s; using no_change: %s",
                round_index,
                edge_name,
                result,
            )
            edge_decisions[edge_name] = _no_change_edge_review_decision()
            continue
        edge_decisions[edge_name] = result

    reviewed_local_box = _apply_box_review_decision(
        local_box,
        _BoxReviewDecision(
            page_index=current_box.page_index,
            left=edge_decisions["left"],
            top=edge_decisions["top"],
            right=edge_decisions["right"],
            bottom=edge_decisions["bottom"],
        ),
        expected_page_index=current_box.page_index,
    )
    return _map_review_crop_box_to_source_box(
        reviewed_local_box,
        crop_bounds,
        image.size,
        page_index=current_box.page_index,
    )


async def _run_review_pipeline(
    *,
    initial_boxes: list[_PageBox],
    max_review_rounds: int,
    review_box: Callable[[_PageBox, int, int], Awaitable[_PageBox]],
    subject_name_singular: str,
    subject_name_plural: str,
) -> list[_PageBox]:
    history_boxes: list[list[_PageBox]] = [initial_boxes]
    page_review_states = {box.page_index: _new_page_review_stop_state() for box in initial_boxes}
    final_boxes = initial_boxes

    for round_index in range(max(0, max_review_rounds)):
        previous_boxes = history_boxes[-1]
        active_boxes = [
            box
            for box in previous_boxes
            if not _page_review_is_fully_frozen(
                page_review_states.setdefault(box.page_index, _new_page_review_stop_state())
            )
        ]
        if not active_boxes:
            logger.info(
                "All %s frozen by review stop condition before round %s; stopping reviews.",
                subject_name_plural,
                round_index + 1,
            )
            final_boxes = previous_boxes
            break

        review_results = await asyncio.gather(
            *(review_box(box, len(history_boxes), round_index + 1) for box in active_boxes),
            return_exceptions=True,
        )
        results_by_page = {
            box.page_index: result for box, result in zip(active_boxes, review_results, strict=False)
        }

        reviewed_boxes: list[_PageBox] = []
        for prior_box in previous_boxes:
            page_state = page_review_states.setdefault(
                prior_box.page_index,
                _new_page_review_stop_state(),
            )
            if _page_review_is_fully_frozen(page_state):
                reviewed_boxes.append(prior_box)
                continue

            result = results_by_page.get(prior_box.page_index)
            if isinstance(result, BaseException):
                logger.info(
                    "Review round %s %s %s failed, keeping prior box: %s",
                    round_index + 1,
                    subject_name_singular,
                    prior_box.page_index,
                    result,
                )
                reviewed_boxes.append(prior_box)
                continue
            if result is None:
                reviewed_boxes.append(prior_box)
                continue

            reviewed_boxes.append(
                _apply_page_review_stop_condition(
                    prior_box=prior_box,
                    reviewed_box=result,
                    page_state=page_state,
                    round_index=round_index + 1,
                    subject_name=subject_name_singular,
                )
            )

        reviewed_boxes = sorted(reviewed_boxes, key=lambda item: item.page_index)
        if not reviewed_boxes:
            break
        if _boxes_equal(reviewed_boxes, previous_boxes):
            final_boxes = reviewed_boxes
            break

        history_boxes.append(reviewed_boxes)
        final_boxes = reviewed_boxes

    _log_box_history(history_boxes, subject_name=subject_name_singular.title())
    return final_boxes


[docs] @dataclass(slots=True) class LLMPageDetector(PageDetectionBackend): """Detect one or more pages via a multimodal LLM prompt. :param model: Multimodal model identifier to query through LiteLLM. :param system_prompt: System prompt used for the initial page-box request. :param prompt_template: Optional user prompt override for the initial request. :param transport: Optional LiteLLM transport config. :param max_review_rounds: Number of iterative review rounds used to refine the initial page boxes. """ model: str system_prompt: str = DEFAULT_BOUNDARY_DETECTION_PROMPT prompt_template: str | None = None transport: LiteLLMTransportConfig | None = None max_review_rounds: int = 0
[docs] async def detect(self, image: Image.Image) -> list[PageCandidate]: """Detect page candidates from one image. :param image: Source image that may contain one or more visible pages. :returns: Detected page candidates in reading order. Falls back to a single full-image candidate when no page boxes are returned. """ processed_image, transform = _prepare_detection_image(image) transport = LiteLLMTransport(self.transport) boxes = await _complete_page_boxes( model=self.model, image=processed_image, system_prompt=self.system_prompt, user_prompt=self.prompt_template, transport=transport, ) if not boxes: return [_full_image_candidate(image)] if self.max_review_rounds > 0: async def _review_page_candidate( box: _PageBox, history_steps: int, round_index: int, ) -> _PageBox: return await _review_page_box( image=processed_image, current_box=box, history_steps=history_steps, round_index=round_index, model=self.model, transport=transport, ) boxes = await _run_review_pipeline( initial_boxes=boxes, max_review_rounds=self.max_review_rounds, review_box=_review_page_candidate, subject_name_singular="page", subject_name_plural="pages", ) candidates: list[PageCandidate] = [] for page_index, box in enumerate(boxes): original_bbox = transform.map_box_to_original(box) candidates.append( PageCandidate( bbox=original_bbox, polygon=_bbox_to_polygon(original_bbox), metadata={ "page_index": page_index, "detector": "llm", "response_page_index": box.page_index, }, ) ) return candidates or [_full_image_candidate(image)]
[docs] async def locate_text_block_bbox_with_llm( image: Image.Image, block_text: str, *, block_tag: str, model: str, transport: LiteLLMTransportLike = None, max_review_rounds: int = 0, ) -> BoundingBox | None: """Locate the tight bbox of a specific rendered text block via a multimodal LLM. :param image: Source page image containing the rendered block. :param block_text: Normalized text content of the target block. :param block_tag: HDML-style block tag describing the block type. :param model: Multimodal model identifier to query through LiteLLM. :param transport: Optional LiteLLM transport or transport config. :param max_review_rounds: Number of iterative review rounds used to refine the initial box. :returns: Bounding box in source-image coordinates, or ``None`` when no unique matching block can be found. :raises ValueError: If ``block_text`` or ``block_tag`` is blank. """ normalized_block_text = block_text.strip() if not normalized_block_text: message = "block_text must not be blank." raise _value_error(message) normalized_block_tag = block_tag.strip() if not normalized_block_tag: message = "block_tag must not be blank." raise _value_error(message) processed_image, transform = _prepare_detection_image(image) llm_transport = transport if isinstance(transport, LiteLLMTransport) else LiteLLMTransport(transport) box = await _complete_text_block_box( model=model, image=processed_image, block_tag=normalized_block_tag, block_text=normalized_block_text, transport=llm_transport, ) if box is None: logger.info( "LLM text-block localization did not find a match for tag=%s.", normalized_block_tag, ) return None if max_review_rounds > 0: async def _review_text_block_candidate( review_box: _PageBox, history_steps: int, round_index: int, ) -> _PageBox: return await _review_text_block_box( image=processed_image, current_box=review_box, block_tag=normalized_block_tag, block_text=normalized_block_text, history_steps=history_steps, round_index=round_index, model=model, transport=llm_transport, ) reviewed_boxes = await _run_review_pipeline( initial_boxes=[box], max_review_rounds=max_review_rounds, review_box=_review_text_block_candidate, subject_name_singular="text block", subject_name_plural="text blocks", ) if not reviewed_boxes: return None box = reviewed_boxes[0] return transform.map_box_to_original(box)
[docs] def locate_text_block_bbox_with_llm_sync( image: Image.Image, block_text: str, *, block_tag: str, model: str, transport: LiteLLMTransportLike = None, max_review_rounds: int = 0, ) -> BoundingBox | None: """Synchronously locate the tight bbox of a specific rendered text block via a multimodal LLM. :param image: Source page image containing the rendered block. :param block_text: Normalized text content of the target block. :param block_tag: HDML-style block tag describing the block type. :param model: Multimodal model identifier to query through LiteLLM. :param transport: Optional LiteLLM transport or transport config. :param max_review_rounds: Number of iterative review rounds used to refine the initial box. :returns: Bounding box in source-image coordinates, or ``None`` when no unique matching block can be found. :raises ValueError: If ``block_text`` or ``block_tag`` is blank. """ return run_sync( locate_text_block_bbox_with_llm( image, block_text, block_tag=block_tag, model=model, transport=transport, max_review_rounds=max_review_rounds, ) )
[docs] @dataclass(slots=True) class AzurePageDetector(PageDetectionBackend): """Detect pages from Azure Document Intelligence page output. :param endpoint: Azure Document Intelligence endpoint URL. :param api_key: Azure API key for the configured resource. :param model_id: Azure model ID used for page analysis. """ endpoint: str api_key: str model_id: str = "prebuilt-layout"
[docs] async def detect(self, image: Image.Image) -> list[PageCandidate]: """Detect page candidates from one image using Azure. :param image: Source image to analyze. :returns: Detected page candidates in reading order. Falls back to a single full-image candidate when Azure returns no pages. :raises ConfigurationError: If the optional Azure dependency is not installed. """ try: from azure.ai.documentintelligence.aio import DocumentIntelligenceClient from azure.core.credentials import AzureKeyCredential except ImportError as exc: # pragma: no cover - optional extra path message = f"Azure page detection requires the `azure` runtime. {install_command_hint('azure')}" raise _configuration_error(message) from exc buffer = BytesIO() image.convert("RGB").save(buffer, format="JPEG") client = DocumentIntelligenceClient( endpoint=self.endpoint, credential=AzureKeyCredential(self.api_key), ) try: image_bytes = buffer.getvalue() async def _analyze_document() -> _AzureAnalyzeResultLike: poller = await client.begin_analyze_document( model_id=self.model_id, body=BytesIO(image_bytes), content_type="application/octet-stream", ) return cast("_AzureAnalyzeResultLike", await poller.result()) result = await retry_api_call( _analyze_document, operation_name="Azure page detection request", context=f"for model {self.model_id}", ) finally: await client.close() candidates: list[PageCandidate] = [] for page_index, page in enumerate(result.pages or []): polygon = _normalize_azure_page_polygon(page, image=image) bbox = _bbox_from_polygon(polygon) if polygon else None metadata = { "page_index": page_index, "page_number": getattr(page, "page_number", page_index + 1), "detector": "azure", } unit = getattr(page, "unit", None) if unit is not None: metadata["unit"] = str(unit) angle = getattr(page, "angle", None) if angle is not None: metadata["angle"] = float(angle) candidates.append(PageCandidate(bbox=bbox, polygon=polygon, metadata=metadata)) return candidates or [_full_image_candidate(image)]
__all__ = [ "AzurePageDetector", "LLMPageDetector", "locate_text_block_bbox_with_llm", "locate_text_block_bbox_with_llm_sync", ]