Source code for medkit.text.spacy.spacy_utils

from __future__ import annotations

__all__ = [
    "extract_anns_and_attrs_from_spacy_doc",
    "build_spacy_doc_from_medkit_doc",
    "build_spacy_doc_from_medkit_segment",
]

import warnings
from typing import TYPE_CHECKING, Callable

from spacy.tokens import Doc
from spacy.tokens import Span as SpacySpan
from spacy.tokens.underscore import Underscore
from spacy.util import filter_spans

from medkit.core import Attribute
from medkit.core.text import AnySpan, Entity, Segment, Span, TextDocument, span_utils
from medkit.io._common import get_anns_by_type

if TYPE_CHECKING:
    from spacy import Language

_ATTR_MEDKIT_ID = "medkit_id"


[docs] def extract_anns_and_attrs_from_spacy_doc( spacy_doc: Doc, medkit_source_ann: Segment | None = None, entities: list[str] | None = None, span_groups: list[str] | None = None, attrs: list[str] | None = None, attribute_factories: dict[str, Callable[[SpacySpan, str], Attribute]] | None = None, rebuild_medkit_anns_and_attrs: bool = False, ) -> tuple[list[Segment], dict[str, list[Attribute]]]: """Given a spacy document, convert selected entities or spans into Segments. Extract attributes for each annotation in the document. Parameters ---------- spacy_doc : Doc A Spacy Doc with spans to be converted medkit_source_ann : Segment, optional Segment used to rebuild spans referencing the original text entities : list of str, optional Labels of entities to be extracted If `None` (default) all new entities will be extracted as annotations span_groups : list of str, optional Name of span groups to be extracted If `None` (default) all new spans will be extracted as annotations attrs : list of str, optional Name of custom attributes to extract from the annotations that will be included. If `None` (default) all the custom attributes will be extracted attribute_factories : dict of str to Callable, optional Mapping of factories in charge of converting spacy attributes to medkit attributes. Factories will receive a spacy span and an attribute label when called. The key in the mapping is the attribute label. rebuild_medkit_anns_and_attrs : bool, default=False If True the annotations and attributes with medkit ids will become new annotations/attributes with new ids. If False (default) the annotations and attributes with medkit ids are not rebuilt, only new annotations and attributes are returned Returns ------- annotations: list of Segment Segments extracted from the spacy Doc object attributes_by_ann: dict of str to list of Attribute Attributes extracted for each annotation, the key is a medkit uid Raises ------ ValueError Raises when the given medkit source and the spacy doc do not have the same medkit uid """ if attribute_factories is None: attribute_factories = {} # extensions to indicate the medkit origin _define_default_extensions() spacy_doc_medkit_id = spacy_doc._.get(_ATTR_MEDKIT_ID) if spacy_doc_medkit_id and medkit_source_ann and medkit_source_ann.uid != spacy_doc_medkit_id: msg = ( "The medkit uid of the Doc object is" f" {spacy_doc_medkit_id}, the medkit source annotation" f" provided has a different uid: {medkit_source_ann.uid}." ) raise ValueError(msg) # get annotations according to entities and name_spans_to_transfer spacy_entities = _get_ents_by_label(spacy_doc, entities) spacy_spans = _get_spans_by_label(spacy_doc, span_groups) spacy_attrs = _get_custom_attrs_by_label(rebuild_medkit_anns_and_attrs, attrs) annotations = [] attributes_by_ann = {} # convert spacy entities for entity_spacy in spacy_entities: medkit_id = entity_spacy._.get(_ATTR_MEDKIT_ID) if medkit_id is None or rebuild_medkit_anns_and_attrs: # create a new entity annotation label = entity_spacy.label_ text, spans = _get_text_and_spans_from_span_spacy( span_spacy=entity_spacy, medkit_source_ann=medkit_source_ann ) entity = Entity(label=label, spans=spans, text=text) medkit_id = entity.uid annotations.append(entity) # for each spacy extension having a value other than None, # a medkit Attribute is created attributes = [] for attr_label in spacy_attrs: value = entity_spacy._.get(attr_label) if value is None: continue factory = attribute_factories.get(attr_label) attribute = factory(entity_spacy, attr_label) if factory else Attribute(attr_label, value) attributes.append(attribute) if attributes: attributes_by_ann[medkit_id] = attributes # convert spacy span groups for label, spans in spacy_spans.items(): for span_spacy in spans: # ignore spans that have a corresponding entity # (some matchers, for instance in EDS-NLP create both an entity and # a span for each match) if span_spacy in spacy_entities: continue medkit_id = span_spacy._.get(_ATTR_MEDKIT_ID) if medkit_id is None or rebuild_medkit_anns_and_attrs: # create new segment annotation text, new_spans = _get_text_and_spans_from_span_spacy( span_spacy=span_spacy, medkit_source_ann=medkit_source_ann ) segment = Segment( label=label, spans=new_spans, text=text, attrs=[], metadata={"name": span_spacy.label_}, ) # 'label' represents 'span_key' from spacy # 'name' in metadata represents the original label of the span in spacy medkit_id = segment.uid annotations.append(segment) # for each spacy extension having a value other than None, # a medkit Attribute is created attributes = [] for attr_label in spacy_attrs: value = span_spacy._.get(attr_label) if value is None: continue factory = attribute_factories.get(attr_label) attribute = factory(span_spacy, attr_label) if factory else Attribute(attr_label, value) attributes.append(attribute) if attributes: attributes_by_ann[medkit_id] = attributes return annotations, attributes_by_ann
[docs] def build_spacy_doc_from_medkit_doc( nlp: Language, medkit_doc: TextDocument, labels_anns: list[str] | None = None, attrs: list[str] | None = None, include_medkit_info: bool = True, ) -> Doc: """Create a Spacy Doc from a TextDocument. Parameters ---------- nlp: Language object with the loaded pipeline from Spacy medkit_doc: TextDocument to convert labels_anns: Labels of annotations to include in the spacy document. If `None` (default) all the annotations will be included. attrs: Labels of attributes to add in the annotations that will be included. If `None` (default) all the attributes will be added as `custom attributes` in each annotation included. include_medkit_info: If True, medkitID is included as an extension in the Doc object to identify the medkit source annotation. If False, no information about IDs is included Returns ------- Doc: A Spacy Doc with the selected annotations included. """ # extensions to indicate the medkit origin _define_default_extensions() # get the raw text segment to transfer raw_segment = medkit_doc.raw_segment annotations = get_anns_by_type(medkit_doc, anns_labels=labels_anns) # create a spacy doc return build_spacy_doc_from_medkit_segment( nlp=nlp, segment=raw_segment, annotations=annotations["segments"] + annotations["entities"], attrs=attrs, include_medkit_info=include_medkit_info, )
[docs] def build_spacy_doc_from_medkit_segment( nlp: Language, segment: Segment, annotations: list[Segment] | None = None, attrs: list[str] | None = None, include_medkit_info: bool = True, ) -> Doc: """Create a Spacy Doc from a Segment. Parameters ---------- nlp: Language object with the loaded pipeline from Spacy segment: Segment to convert, this annotation contains the text to create the spacy doc annotations: List of annotations in `segment` to include attrs: Labels of attributes to add in the annotations that will be included. If `None` (default) all the attributes will be added as `custom attributes` in each annotation included. include_medkit_info: If True, medkitID is included as an extension in the Doc object to identify the medkit source annotation. If False, no information about IDs is included. Returns ------- Doc: A Spacy Doc with the selected annotations included. """ # extensions to indicate the medkit origin _define_default_extensions() # create spacy doc doc = nlp.make_doc(segment.text) if include_medkit_info: doc._.set(_ATTR_MEDKIT_ID, segment.uid) annotations = annotations or [] if not annotations: return doc # include annotations in the Doc object # define custom attributes in spacy from selected annotations if attrs is None: # include all attributes attrs = {attr.label for ann in annotations for attr in ann.attrs} _define_attrs_extensions(attrs) entities = [] segments = [] for ann in annotations: if isinstance(ann, Entity): # intermediate list to check for overlaps entities.append(ann) elif isinstance(ann, Segment): segments.append(ann) _add_entities_in_spacy_doc( spacy_doc=doc, entities=entities, attrs=attrs, include_medkit_info=include_medkit_info, ) _add_segments_in_spacy_doc( spacy_doc=doc, segments=segments, attrs=attrs, include_medkit_info=include_medkit_info, ) return doc
def _add_entities_in_spacy_doc(spacy_doc: Doc, entities: list[Entity], attrs: list[str], include_medkit_info: bool): """Convert entities into spacy spans and modifies the entities in the Doc object (doc.ents) """ # create an intermediate list to check for overlaps spacy_entities = [] for medkit_ent in entities: spacy_span = _segment_to_spacy_span( spacy_doc_target=spacy_doc, medkit_segment=medkit_ent, attrs=attrs, include_medkit_info=include_medkit_info, ) spacy_entities.append(spacy_span) # since Spacy does not allow overlaps in entities, # `filter_spans` suppresses duplicates or overlaps. ents_filtered = filter_spans(spacy_entities) # overwrite entities in the document, ensure the transfer # of the medkit entities spacy_doc.ents = ents_filtered discarded_str = "--".join([ent.text for ent in spacy_entities if ent not in ents_filtered]) if discarded_str: warnings.warn( f"Spacy does not allow entity overlapping, these entities ({discarded_str})" " were discarded", stacklevel=2, ) def _add_segments_in_spacy_doc( spacy_doc: Doc, segments: list[Segment], attrs: list[str], include_medkit_info: bool, ): """Convert segments into a spacy spans and modifies the spans in the Doc object (doc.spans) """ for medkit_seg in segments: spacy_span = _segment_to_spacy_span( spacy_doc_target=spacy_doc, medkit_segment=medkit_seg, attrs=attrs, include_medkit_info=include_medkit_info, ) # it is not necessary to check overlaps, # the spans are added directly into the Doc object if medkit_seg.label not in spacy_doc.spans: spacy_doc.spans[medkit_seg.label] = [spacy_span] else: spacy_doc.spans[medkit_seg.label].append(spacy_span) def _get_defined_spacy_attrs(include_medkit_attrs: bool = False) -> list[str]: """Returns the name of the custom attributes configured in spacy spans. Parameters ---------- include_medkit_attrs: If True, medkit attrs (attrs transferred from medkit) are included Returns ------- List[str]: Name of spans extensions defined in Spacy """ # `get_state` is a spacy function, it returns a tuple of dictionaries # with the information of the defined extensions (custom attributes) # where ([0]= token_extensions,[1]=span_extensions,[2]=doc_extensions) available_attrs = Underscore.get_state()[1].keys() # remove default medkit attributes attrs = [attr for attr in available_attrs if not attr.endswith(_ATTR_MEDKIT_ID)] if include_medkit_attrs: return attrs # does not include medkit-defined attributes # remove attrs that have a medkit ID return [attr for attr in attrs if f"{attr}_{_ATTR_MEDKIT_ID}" not in available_attrs] def _define_spacy_span_extension(custom_attr: str): if not SpacySpan.has_extension(custom_attr): SpacySpan.set_extension(custom_attr, default=None) def _define_spacy_doc_extension(custom_attr: str): if not Doc.has_extension(custom_attr): Doc.set_extension(custom_attr, default=None) def _define_default_extensions(): """Define default attributes to identify origin from medkit""" _define_spacy_doc_extension(_ATTR_MEDKIT_ID) _define_spacy_span_extension(_ATTR_MEDKIT_ID) def _define_attrs_extensions(attrs_to_transfer: list[str]): """Define attributes as span extensions in the Spacy context.""" for attr in attrs_to_transfer: # `attr_medkit_id` is the medkit ID of the original attribute _define_spacy_span_extension(f"{attr}_{_ATTR_MEDKIT_ID}") _define_spacy_span_extension(attr) def _get_span_boundaries(spans: list[AnySpan]) -> tuple[int, int]: """Return boundaries (start,end) from a list of spans""" spans_norm: list[Span] = span_utils.normalize_spans(spans) start = spans_norm[0].start end = spans_norm[-1].end if len(spans_norm) > 1: # Spacy does not allow discontinuous spans # for compatibility, get a continuous span from the list warnings.warn( f"These spans {spans} are discontinuous, they were converted" f" into its expanded version, from {start} to {end}.", stacklevel=2, ) return (start, end) def _segment_to_spacy_span( spacy_doc_target: Doc, medkit_segment: Segment, attrs: list[str], include_medkit_info: bool, ) -> Span: """Create a spacy span given a medkit segment.""" # create a spacy span from characters in the text instead of tokens start, end = _get_span_boundaries(medkit_segment.spans) label = medkit_segment.metadata.get("name", medkit_segment.label) span = spacy_doc_target.char_span(start, end, alignment_mode="expand", label=label) if include_medkit_info: span._.set(_ATTR_MEDKIT_ID, medkit_segment.uid) for label in attrs: for attr in medkit_segment.attrs.get(label=label): value = attr.to_spacy() if value is None: # in medkit having an attribute, indicates that the attribute exists # for the given annotation, we force True as value value = True # set attributes as extensions span._.set(attr.label, value) if include_medkit_info: span._.set(f"{attr.label}_{_ATTR_MEDKIT_ID}", attr.uid) return span def _get_text_and_spans_from_span_spacy( span_spacy: SpacySpan, medkit_source_ann: Segment | None ) -> tuple[str, list[AnySpan]]: """Return text and spans depending on the origin of the spacy span""" if medkit_source_ann is None: text = span_spacy.text spans = [Span(span_spacy.start_char, span_spacy.end_char)] else: # the origin is a medkit annotation text, spans = span_utils.extract( medkit_source_ann.text, medkit_source_ann.spans, [(span_spacy.start_char, span_spacy.end_char)], ) return text, spans def _get_ents_by_label(spacy_doc: Doc, entities: list[str] | None = None) -> list[SpacySpan]: return [ent for ent in spacy_doc.ents if ent.label_ in entities] if entities else list(spacy_doc.ents) def _get_spans_by_label(spacy_doc: Doc, span_groups: list[str] | None = None) -> dict[str, list[SpacySpan]]: if span_groups is None: spans = dict(spacy_doc.spans) else: spans = {label: sp for label, sp in spacy_doc.spans.items() if label in span_groups} return spans def _get_custom_attrs_by_label(rebuild_medkit_anns_and_attrs: bool, attributes: list[str] | None = None) -> list[str]: spacy_attrs = _get_defined_spacy_attrs(rebuild_medkit_anns_and_attrs) if attributes is not None: # filter attributes by label spacy_attrs = [attr for attr in spacy_attrs if attr in attributes] return spacy_attrs