Source code for mgnipy.V2.query_set

import logging
import os
from copy import deepcopy
from typing import (
    Any,
    Callable,
    Literal,
    Optional,
)

import pandas as pd

from mgnipy._models.config import MgnipyConfig
from mgnipy._models.CONSTANTS import SupportedEndpoints
from mgnipy.V2.endpoints import (
    ALL_ENDPOINTS,
    ALL_SUPPORTED_RELATIONSHIPS,
)
from mgnipy.V2.mixins import (
    DescribeEmgapiMixin,
    ResultsHandlerMixin,
)
from mgnipy.V2.query_executor import QueryExecutor

ID_PARAM = {
    SupportedEndpoints.BIOMES: "biome_lineage",
    SupportedEndpoints.BIOME: "biome_lineage",
    SupportedEndpoints.STUDIES: "accession",
    SupportedEndpoints.SAMPLES: "accession",
    SupportedEndpoints.RUNS: "accession",
    SupportedEndpoints.ANALYSES: "accession",
    SupportedEndpoints.GENOMES: "accession",
    SupportedEndpoints.ASSEMBLIES: "accession",
    SupportedEndpoints.PUBLICATIONS: "pubmed_id",
    SupportedEndpoints.CATALOGUES: "catalogue_id",
    SupportedEndpoints.STUDY: "accession",
    SupportedEndpoints.SAMPLE: "accession",
    SupportedEndpoints.RUN: "accession",
    SupportedEndpoints.ANALYSIS: "accession",
    SupportedEndpoints.GENOME: "accession",
    SupportedEndpoints.ASSEMBLY: "accession",
    SupportedEndpoints.PUBLICATION: "pubmed_id",
    SupportedEndpoints.CATALOGUE: "catalogue_id",
}


[docs] class QuerySet(ResultsHandlerMixin, DescribeEmgapiMixin): """ Plans, builds, validates and previews queries based on endpoint_module and params of the MGnifier owner. Stores the request urls. if mgnifier owner changes then the QuerySet should be re-instantiated to update the urls and other info. """ def __init__( self, resource: Literal[ "biomes", "biome", "studies", "study", "samples", "sample", "runs", "run", "genomes", "genome", "analyses", "analysis", "assemblies", "assembly", ], *, config: Optional[dict] = None, params: Optional[dict[str, Any]] = None, **kwargs, ): self.config: MgnipyConfig = MgnipyConfig(**config) if config else MgnipyConfig() self._base_url: str = str(self.config.base_url) self._resource = SupportedEndpoints.validate(resource) # params as dict self._params: dict[str, Any] = params or {} # add kwargs to params if provided, prioritizing kwargs if kwargs: self._params.update(kwargs) # default endpoint modules based on resource, can be overridden by owner self._endpoint_module: Callable = ALL_ENDPOINTS[self._resource] # check that params are valid for endpoint module # check params are valid for endpoint # self.validate_endpoint_kwargs(**self._params) self.exec: QueryExecutor = QueryExecutor(self) self.count: Optional[int] = None self.total_pages: Optional[int] = None self.default_page_size: int = 25 # request_urls self.request_urls: Optional[list[str]] = None # results self._results: dict[int, list[dict]] = {} @property def request_url(self) -> str: """ Get the URL for the API request based on the current resource and parameters. This is a single URL that represents the request for the current page of results. Returns ------- str The constructed URL for the API request. """ return self._build_request_url() @property def endpoint_module(self) -> Callable: return self._endpoint_module @endpoint_module.setter def endpoint_module(self, value: Callable): # clone the current instance but with updated endpoint module self._endpoint_module = value # check params are valid for new endpoint module # self.validate_endpoint_kwargs(**self._params) # reset results and urls since endpoint module changed self._results = {} self.request_urls = None @property def params(self) -> dict[str, Any]: return self._params @params.setter def params(self, new_params: dict[str, Any]): self._params = new_params # check that params are valid for endpoint module _ = self.validate_endpoint_kwargs(**self._params) @property def results(self) -> dict[int, list[dict]]: return self._results @property def results_ids(self) -> Optional[list[str]]: """ Get a list of accessions from the retrieved metadata results, if available. Returns ------- list of str or None A list of accession strings if available, otherwise None. """ if self.to_df() is None: return None elif self.id_param_key in self.to_df().columns: return self.to_df()[self.id_param_key].tolist() else: return None @property def resource(self) -> SupportedEndpoints: return self._resource @resource.setter def resource(self, value: str): self._resource = SupportedEndpoints.validate(value) self.endpoint_module = ALL_ENDPOINTS[self._resource] # check that params are valid for new endpoint module _ = self.validate_endpoint_kwargs(**self._params) # reset results and urls since resource changed self._results = {} self.request_urls = None def _is_in_results(self, page_num: int) -> bool: """ Check if results for a specific page number already exist in the results. Parameters ---------- page_num : int The page number to check for existing results. Returns ------- bool True if results for the specified page number exist, False otherwise. """ # get number of pages if not already if (self.count is None) or (self.total_pages is None): self.dry_run(verbose=False) if not (isinstance(page_num, int) and 0 < page_num <= self.total_pages): raise ValueError( f"Invalid page number: {page_num}. " "Pages must be positive integers " f"not exceeding total pages {self.total_pages}." ) return page_num in self._results # PARAM HANDLING def _spawn( self, *, target_resource: Optional[str] = None, params: Optional[dict[str, Any]] = None, **kwargs, ) -> "QuerySet": """ Spawn a new QuerySet instance for a related resource with given parameters. Returns ------- QuerySet A new QuerySet instance with other resource and parameters. """ merged_params = {**(params or {}), **kwargs} resource_override = merged_params.pop("resource", None) return QuerySet( resource=target_resource or resource_override or self.resource, config=self.config, params=merged_params, ) def _clone(self, **param_overrides): """ 'polymorphism-aware, immutable-style clone helper' to create a new instance of the same class with updated parameters. This method is used internally to create new QuerySet instances with updated parameters while preserving the original instance's state. Parameters ---------- **param_overrides Keyword arguments representing the parameters to override in the new instance. These will be merged with the existing parameters, with the provided overrides taking precedence. Returns ------- QuerySet A new instance of the same class with the updated parameters. """ merged_params = {**self.params, **param_overrides} resource_override = merged_params.pop("resource", None) target_resource = ( getattr(self, "RESOURCE", None) or resource_override or self.resource ) new_qs = self.__class__( resource=target_resource, config=self.config.model_dump(mode="json"), params=merged_params, ) new_qs.endpoint_module = self.endpoint_module return new_qs
[docs] def filter( self, **filters, ) -> "QuerySet": """ Update the parameters for the API call to filter results. Parameters ---------- **filters Keyword arguments corresponding to the supported parameters for the current resource. These will be used to filter the results returned by the API. Returns ------- QuerySet A new QuerySet instance with updated parameters for filtering results. """ # make a copy of current instance but with updated params new_qs = self._clone(**filters) return new_qs
[docs] def page_size(self, n: int) -> "QuerySet": """ Set the page size for paginated API calls. Parameters ---------- n : int Returns ------- QuerySet A new QuerySet instance with the updated page size parameter. """ if not isinstance(n, int) or n <= 0: raise ValueError("Page size must be a positive integer.") # make a copy of current instance new_qs = self._clone(page_size=n) return new_qs
@property def base_url(self) -> str: return self._base_url @property def pagination_status(self) -> bool: """ Check if the current resource requires pagination based on its supported keyword arguments. Returns ------- bool True if pagination, False otherwise. """ return ( "page" in self.list_supported_params() and "page_size" in self.list_supported_params() ) def _build_request_params( self, params: Optional[dict[str, Any]] = None, **kwargs ) -> dict[str, Any]: """ Build the parameters for the API request by combining the current parameters with any additional parameters provided. Parameters ---------- params : dict, optional Additional parameters to include in the API request. **kwargs Additional keyword arguments to include in the API request. Returns ------- dict The combined parameters for the API request. """ # combine params with kwargs, with kwargs taking precedence request_params = {**(params or self.params), **kwargs} # if pagination and no page size set, add default page size if self.pagination_status and "page_size" not in request_params: request_params["page_size"] = self.default_page_size return request_params def _build_request_url( self, params: Optional[dict[str, Any]] = None, ) -> str: """ Build a URL for the current resource and parameters using the endpoint module's URL template and the provided parameters. (currently for logging/verbose output only). Parameters ---------- params : dict, optional Parameters to include in the URL. If None, uses self.params. exclude : list of str, optional List of parameter names to exclude from the URL query string. These are typically parameters that are not used for filtering in the API call, such as 'accession' or 'pubmed_id'. Returns ------- str The constructed URL. """ # accept given params or use self.params _params = deepcopy(params or self.params) # combine sub_url and encoded query params path = self.url_path(**_params) # return full url with base url+sub_url+encoded params return os.path.join(self._base_url, path) # preview the request(s) prior to making them (option 1)
[docs] def dry_run(self, *, verbose: bool = True) -> None: """ Plan the API call by validating parameters and estimating the number of pages and records available. Prints the plan details for the user to review before executing the full data retrieval. This method can be called before get() to ensure that the parameters are valid and to understand the scope of the data retrieval. Returns ------- None """ # verbose if verbose: print("Planning the API call with params:") print(self.params) if self.count is not None and self.total_pages is not None: logging.info("Already have count and total_pages from previous dry run") elif not self.pagination_status: self.count = 1 self.total_pages = 1 else: self.exec.get_pageinated_counts() if verbose: print(f"Total pages to retrieve: {self.total_pages}") print(f"Total records to retrieve: {self.count}")
# preview the request(s) prior to making them (option 2)
[docs] def list_urls(self) -> list[str]: """ Generate and return a list of URLs for all the API requests that would be made to retrieve the data based on the current parameters. This allows the user to see exactly which endpoints and query parameters will be used in the API calls before executing them. Returns ------- list of str A list of URLs corresponding to each API request that would be made. """ if self.request_urls is not None: return self.request_urls if not self.pagination_status: self.request_urls = [self._build_request_url()] else: # ensure we have total_pages calculated if self.total_pages is None: self.dry_run() self.request_urls = [] for page in range(1, self.total_pages + 1): _parm = deepcopy(self.params) _parm.update({"page": page}) self.request_urls.append(self._build_request_url(params=_parm)) return self.request_urls
[docs] def explain(self, head: Optional[int] = None) -> None: """ Print example URLs that would be called. Actual requests handled by client. """ _ = self.list_urls() # ensure urls are generated limit = min(head, self.total_pages) if head else self.total_pages for url in self.list_urls()[:limit]: print(url)
# preview the request(s) prior to making them (option 3)
[docs] def preview(self) -> pd.DataFrame: """ Preview the first page of metadata for the current resource and parameters, without retrieving all pages. This allows the user to quickly check the structure and content of the data before deciding to retrieve everything. Returns ------- pd.DataFrame A DataFrame containing the metadata from the specified page of results. Raises ------ RuntimeError If the API call fails or if no data is available to preview. """ first = self.first() return self.to_df({1: first})
# alternatively preview get first
[docs] def first(self) -> dict: """ Retrieve the first page of metadata for the current resource and parameters. Same as preview() but returns the raw dictionary instead of a DataFrame. """ self.exec.get_any_first() return self._results.get(1, [])
[docs] async def afirst(self) -> dict: """ Asynchronously retrieve the first page of metadata for the current resource and parameters. Same as preview() but returns the raw dictionary instead of a DataFrame. """ await self.exec.aget_any_first() return self._results.get(1, [])
# dunder methods def __str__(self): """ Return a string representation of the MGnifier instance, summarizing key configuration and state. Returns ------- str Human-readable summary of the instance. """ cls = type(self) class_path = f"{cls.__module__}.{cls.__qualname__}" return ( f"MGnifier instance for resource: {self.resource}\n" f"I.e., {class_path}\n" f"----------------------------------------\n" f"Base URL: {self._base_url}\n" f"Parameters: {self.params}\n" f"Endpoint module: {self.endpoint_module.__name__ or 'None'}\n" f"Example request URL: {self._build_request_url()}\n" f"Returns paginated results: {self.pagination_status}\n" ) def __call__(self, **kwargs): return self.filter(**kwargs) @property def id_param_key(self) -> str: try: return ID_PARAM[self.resource] except KeyError: raise AttributeError( f"Resource {self.resource} does not have a defined access identifier key." ) from None @property def identifier(self) -> Optional[str]: """ Get the identifier value from the parameters based on the resource type. This is used for constructing URLs for related resources. Returns ------- str or None The identifier value corresponding to the resource type, or None if not available. """ try: return self.params[self.id_param_key] except KeyError: raise AttributeError( f"Identifier key '{self.id_param_key}' not found in parameters for resource '{self.resource}'." ) from None def _resolve_id_param(self, key: int | str) -> dict: """ Resolve the identifier parameter for a related resource based on the provided key, which can be either an index or a string identifier. This method checks if the key is a valid index in the results or a valid identifier string, and returns the corresponding parameter dictionary for accessing the related resource. Parameters ---------- key : int or str An integer index referring to the position in the results, or a string identifier (such as an accession or biome lineage) that exists in the results. Returns ------- dict A dictionary containing the identifier parameter key and its corresponding value, which can be used to access the related resource. For example, {"accession": "MGYS00001234"} or {"biome_lineage": "root"}. """ # allow index-based access if self.results_ids is not None and isinstance(key, int): return {self.id_param_key: self.results_ids[key]} # or by accession/biome_lineage/ids string directly if self.results_ids is not None and key in self.results_ids: return {self.id_param_key: key} raise KeyError( f"Invalid key: {key}. " "Key must be an integer index, or a valid id string. " f"Accession/id/biome_lineage must exist in`.results_ids`: {self.results_ids}" ) # RELATIONSHIP HANDLING
[docs] def list_relationships(self) -> list[str]: if self.resource in ALL_SUPPORTED_RELATIONSHIPS: return [ endpoint.value for endpoint in ALL_SUPPORTED_RELATIONSHIPS[self.resource] ] else: return []
[docs] def describe_relationships(self): pass # TODO