Source code for mgnipy.V2.datasets

import asyncio
import io
import logging
import warnings
import webbrowser
import zlib
from pathlib import Path
from typing import (
    Callable,
    Generator,
    Literal,
    Optional,
)

import aiofiles
import httpx
import ijson
import pandas as pd
from pydantic import (
    DirectoryPath,
    HttpUrl,
)
from requests.exceptions import HTTPError
from skbio.io import read
from tqdm import tqdm as tqdm_sync
from tqdm.asyncio import tqdm as tqdm_async

from mgnipy._models.config import MgnipyConfig
from mgnipy._shared_helpers.async_helpers import get_semaphore
from mgnipy.emgapi_v2_client.api.analyses import (
    analysis_get_mgnify_analysis_with_annotations,
)
from mgnipy.emgapi_v2_client.models.m_gnify_analysis_with_annotations import (
    MGnifyAnalysisWithAnnotations,
)
from mgnipy.V2.core import MGnifier

semaphore = get_semaphore()
BASE_URL = MgnipyConfig().base_url


[docs] class MGazine(MGnifyAnalysisWithAnnotations): """More so an extended data class""" def __init__(self, accession: str): # mgnifier TODO handing private data self._mgnifier_helper = MGnifier(accession=accession) # set endpoint self._mgnifier_helper.endpoint_module = ( analysis_get_mgnify_analysis_with_annotations ) # get the data self._mgnifier_helper.get() # init with the data super().__init__(**self._mgnifier_helper.results[0][0]) @property def url_dict(self) -> dict[str, dict]: """returns a dict of alias: url for all downloads""" return {f["alias"]: f.get("url", None) for f in self.downloads} @property def downloads_df(self) -> pd.DataFrame: """returns a dataframe of all downloads with columns alias, url, file_type""" return pd.DataFrame(self.downloads) @property def url_list(self): """returns a list of all download urls""" return [f.get("url", None) for f in self.downloads] def _get_url_by_alias( self, alias: str, df: Optional[pd.DataFrame] = None ) -> Optional[str]: """ Gets the download url for a given alias Parameters ---------- alias : str The alias of the download. df : Optional[pd.DataFrame], optional The dataframe to query. If None, uses the downloads_df property. Returns ------- Optional[str] The download url for the given alias, or None if not found. """ df = df or self.downloads_df try: return df.query(f"alias == '{alias}'")["url"].values[0] except RuntimeError as err: raise KeyError(f"Issue getting download url for alias: {alias}") from err def _get_alias_by_url( self, url: HttpUrl, df: Optional[pd.DataFrame] = None ) -> Optional[str]: """ Gets the alias for a given download url Parameters ---------- url : HttpUrl The url of the download. df : Optional[pd.DataFrame], optional The dataframe to query. If None, uses the downloads_df property. Returns ------- Optional[str] The alias for the given url, or None if not found. """ df = df or self.downloads_df try: return df.query(f"url == '{url}'")["alias"].values[0] except RuntimeError as err: raise KeyError(f"Issue getting alias for url: {url}") from err def _get_type_by_alias( self, alias: str, df: Optional[pd.DataFrame] = None ) -> Optional[str]: """ Gets the file type for a given alias Parameters ---------- alias : str The alias of the download. df : Optional[pd.DataFrame], optional The dataframe to query. If None, uses the downloads_df property. Returns ------- Optional[str] The file type for the given alias, or None if not found. """ df = df or self.downloads_df try: return df.query(f"alias == '{alias}'")["file_type"].values[0] except RuntimeError as err: raise KeyError(f"Issue getting file type for alias: {alias}") from err def _prioritize_alias( self, alias: Optional[str], url: Optional[HttpUrl], required: bool = False, ) -> tuple[str, HttpUrl]: """ Prioritizes alias over url. If both are provided, alias is used and url is ignored with a warning. Parameters ---------- alias : Optional[str] The alias of the download. url : Optional[HttpUrl] The url of the download. required : bool, optional If required is True, raises an error if neither alias nor url is provided. Returns ------- tuple[str, HttpUrl] A tuple of (alias, url) where alias is the prioritized alias and url is the corresponding url. """ if alias and url: warnings.warn( "Both `alias` and `url` provided, ignoring `url`.", stacklevel=2 ) url = self._get_url_by_alias(alias) elif alias and not url: url = self._get_url_by_alias(alias) elif url and not alias: try: alias = self._get_alias_by_url(url) except KeyError: # to reuse download/adownload for other urls alias = None if required and not alias and not url: raise ValueError("Either `alias` or `url` must be provided.") return alias, url
[docs] def stream_tsv( self, url: str, sep: str = "\t", chunksize: Optional[int] = None, max_skip: int = 5, **pd_kwargs, ) -> pd.DataFrame | pd.io.parsers.readers.TextFileReader: """ Reads a tsv file from a url and returns an iterator of pandas dataframes. Handles potential issues with extra header rows (causing pd.errors.ParserError) by trying to read the file with increasing skiprows until it succeeds or reaches max_skip. Parameters ---------- url : str The url of the tsv file to stream. sep : str, optional The separator used in the tsv file. Default is tab. chunksize : int, optional The number of rows to include in each chunk. Default is None. max_skip : int, optional The maximum number of rows to skip before raising an error. Default is 5. pd_kwargs : dict, optional Additional keyword arguments to pass to pandas read_csv. Returns ------- pd.DataFrame | pd.io.parsers.readers.TextFileReader An iterator of pandas dataframes. """ for skip in range(max_skip + 1): try: return pd.read_csv( url, sep=sep, chunksize=chunksize, skiprows=skip if skip > 0 else None, **pd_kwargs, ) except pd.errors.ParserError: continue # Try next skiprows value except Exception as err: raise RuntimeError( f"Error reading TSV from {url} with skiprows={skip}" ) from err raise pd.errors.ParserError( f"Failed to parse {url} after skipping up to {max_skip} rows." )
[docs] def stream_html(self, url: str, **web_kwargs) -> bool: """ Streams an html file from a url and opens it in the default web browser. Parameters ---------- url : str The url of the html file to stream. web_kwargs : dict, optional Additional keyword arguments to pass to webbrowser.open. Returns ------- bool True if the url was opened successfully, False otherwise. """ return webbrowser.open(url, **web_kwargs)
[docs] def stream_txt( self, url: str, chunksize: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, **httpx_kwargs, ) -> Generator: """ Streams a txt file from a url and returns an iterator of strings. Parameters ---------- url : str The url of the txt file to stream. chunksize : Optional[int], optional The number of characters to include in each chunk. Default is None. httpx_kwargs : dict, optional Additional keyword arguments to pass to the httpx client. Returns ------- Generator[str, None, None] An iterator of strings. """ client = httpx_client or self._mgnifier_helper.httpx_client if chunksize is None: # load as whole with client.get(url, **httpx_kwargs) as response: response.raise_for_status() return response.text elif isinstance(chunksize, int) and chunksize > 0: # load in chunks with client.stream("GET", url, **httpx_kwargs) as response: response.raise_for_status() chunk = [] for line in response.iter_text(): chunk.append(line) if len(chunk) == chunksize: yield chunk chunk = [] if chunk: yield chunk else: raise ValueError("`chunksize` must be a positive integer or None.")
[docs] def stream_fasta(self, url: str, **skbio_kwargs) -> Generator: """ Streams a fasta file from a url and returns an iterator of tuples (header, sequence). Parameters ---------- url : str The url of the fasta file to stream. skbio_kwargs : dict, optional Additional keyword arguments to pass to the skbio parsers. https://scikit.bio/docs/latest/generated/skbio.io.format.fasta.html Returns ------- Generator[tuple[str, str], None, None] An iterator of tuples (header, sequence). """ return read(url, format="fasta", **skbio_kwargs)
[docs] def stream_gff(self, url: str, **skbio_kwargs) -> Generator: """ Streams a gff file from a url and returns an iterator of parsed gff records. Parameters ---------- url : str The url of the gff file to stream. skbio_kwargs : dict, optional Additional keyword arguments to pass to the skbio parser. https://scikit.bio/docs/latest/generated/skbio.io.format.gff3.html Returns ------- Generator[skbio.io._gff3.GFF3Record, None, None] "generator of tuple (seq_id of str type, skbio.metadata.IntervalMetadata)" """ return read(url, format="gff3", **skbio_kwargs)
[docs] def stream_biom(self, url: str, **skbio_kwargs) -> Generator: """ Streams a biom file from a url and returns an iterator of parsed biom records. Parameters ---------- url : str The url of the biom file to stream. skbio_kwargs : dict, optional Additional keyword arguments to pass to the skbio parser. Returns ------- Generator[dict, None, None] An iterator of parsed biom records as dictionaries. """ return read(url, format="biom", **skbio_kwargs)
[docs] def stream_gzipped( self, url: str, chunksize: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, decode: bool = False, encoding: str = "utf-8", errors: str = "replace", **httpx_kwargs, ) -> bytes | str | io.BufferedReader | io.TextIOWrapper: """ Streams a gzipped file from a url and returns a file-like object that can be read in chunks. Written using GPT-5.3-Codex. Uses httpx for streaming and zlib for decompression. Parameters ---------- url : str The url of the gzipped file to stream. chunksize : int, optional The size of each chunk to read from the stream. httpx_client : httpx.Client, optional The httpx client to use for streaming. decode : bool, default False Whether to decode the decompressed bytes to a string. encoding : str, default "utf-8" The encoding to use for decoding bytes to a string. errors : str, default "replace" The error handling strategy for decoding bytes to a string. **httpx_kwargs : dict Additional keyword arguments to pass to the httpx client. Returns ------- bytes | str | io.BufferedReader | io.TextIOWrapper A file-like object that can be read in chunks. If `chunksize` is None, returns the full decompressed content as bytes, or string based on `decode`. """ # Pick caller-provided client, or fallback to the shared MGnifier client. client = httpx_client or self._mgnifier_helper.httpx_client logging.debug( "stream_gzipped called url=%s chunksize=%s decode=%s", url, chunksize, decode, ) # Backward-compatible mode: no chunksize means full download into memory. if chunksize is None: logging.debug("Using full-download mode (chunksize=None)") # Perform a normal blocking GET. r = client.get(url, timeout=None, **httpx_kwargs) # Raise if HTTP status is not 2xx. r.raise_for_status() # Create gzip-compatible streaming decompressor (16 + MAX_WBITS enables gzip header). decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) # Decompress full payload bytes and flush remaining tail bytes. data = decompressor.decompress(r.content) + decompressor.flush() logging.debug( "Full-download mode complete: compressed=%d decompressed=%d", len(r.content), len(data), ) # Return text if decode=True, else raw bytes. return data.decode(encoding, errors=errors) if decode else data # Validate chunksize for streaming mode. if not isinstance(chunksize, int) or chunksize <= 0: raise ValueError("`chunksize` must be a positive integer or None.") # Custom raw stream that adapts httpx streamed gzip bytes to a readable file-like object. class _HTTPGzipRaw(io.RawIOBase): def __init__(self): # Open streaming HTTP response context. self._cm = client.stream("GET", url, timeout=None, **httpx_kwargs) # Enter context manually so this object controls lifecycle. self._resp = self._cm.__enter__() # Fail fast on non-2xx. self._resp.raise_for_status() # Iterate compressed bytes in fixed-size network chunks. self._iter = self._resp.iter_raw(chunk_size=chunksize) # Incremental gzip decompressor. self._decomp = zlib.decompressobj(16 + zlib.MAX_WBITS) # Internal decompressed byte buffer. self._buf = bytearray() # End-of-stream marker. self._eof = False # Track whether decompressor.flush() has been called. self._flushed = False logging.debug("Streaming HTTP/gzip reader initialized") def readable(self) -> bool: # Required by BufferedReader to know this raw stream supports reading. return True def _fill(self, need: int) -> None: # Keep buffering until we have enough bytes or we hit EOF. while len(self._buf) < need and not self._eof: try: # Pull next compressed network chunk. chunk = next(self._iter) except StopIteration: # Network stream finished; flush remaining decompressor tail once. if not self._flushed: tail = self._decomp.flush() if tail: self._buf.extend(tail) self._flushed = True # Mark EOF so future reads stop pulling. self._eof = True logging.debug("Reached end of HTTP stream") break # If chunk is non-empty, incrementally decompress and append output. if chunk: out = self._decomp.decompress(chunk) if out: self._buf.extend(out) def readinto(self, b) -> int: # If already closed, signal EOF. if self.closed: return 0 # Create writable view into caller-provided buffer. mv = memoryview(b) # Ensure internal buffer has enough data for requested read. self._fill(len(mv)) # Compute how many bytes we can actually return. n = min(len(mv), len(self._buf)) # No bytes available means EOF. if n <= 0: return 0 # Copy decompressed bytes into caller buffer. mv[:n] = self._buf[:n] # Remove consumed bytes from internal buffer. del self._buf[:n] # Return byte count copied. return n def close(self) -> None: # Close HTTP stream context exactly once. if not self.closed: try: self._cm.__exit__(None, None, None) finally: super().close() logging.debug("Streaming HTTP/gzip reader closed") # Wrap raw stream in BufferedReader for efficient file-like reads. raw = _HTTPGzipRaw() buffered = io.BufferedReader(raw, buffer_size=chunksize) # If decode requested, wrap bytes stream as text stream. if decode: return io.TextIOWrapper(buffered, encoding=encoding, errors=errors) # Default return: binary buffered reader (works with ijson.kvitems). return buffered
[docs] def stream_jsonl( self, url: str, orient: Optional[ Literal["records", "split", "index", "columns", "values", "table"] ] = None, chunksize: Optional[int] = None, **pd_kwargs, ) -> dict: """ Streams a jsonl file from a url and returns the parsed json as a dictionary. Parameters ---------- url : str The url of the json file to stream. sep : str, optional The separator to use when parsing the json file. Default is "\t". chunksize : Optional[int], optional The size of the chunks to read from the stream. Default is None. max_skip : int, optional The maximum number of rows to skip before raising an error. Default is 5. **pd_kwargs : dict Additional keyword arguments to pass to the pandas parser. Returns ------- dict The parsed json as a dictionary. """ return pd.read_json( url, orient=orient, lines=True, chunksize=chunksize, **pd_kwargs )
[docs] def stream_json( self, url: str, chunksize: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, **httpx_kwargs, ) -> dict | Generator: """ Streams a json file from a url and returns the parsed json as a dictionary or an iterator of dictionaries if chunksize is specified. Parameters ---------- url : str The url of the json file to stream. chunksize : Optional[int], optional The size of the chunks to read from the stream. Default is None. **httpx_kwargs : dict Additional keyword arguments to pass to the httpx client. Returns ------- dict | Generator The parsed json as a dictionary, or an iterator of dictionaries if chunksize is specified. """ client = httpx_client or self._mgnifier_helper.httpx_client # normal full get and json parse if chunksize is None and not (url.endswith(".gz") or url.endswith(".gzip")): # If no chunksize and not gzipped, do a normal full GET and parse as JSON. with client.get(url, **httpx_kwargs) as response: response.raise_for_status() return response.json() # streaming json, not zipped elif chunksize is not None and not ( url.endswith(".gz") or url.endswith(".gzip") ): # If chunksize specified and not gzipped, stream text and parse JSON objects one by one. with client.stream("GET", url, **httpx_kwargs) as response: response.raise_for_status() for entry in ijson.kvitems(response.iter_text(), ""): yield entry # gzipped json (with or without chunksize) elif url.endswith(".gz") or url.endswith(".gzip"): # If gzipped, use the stream_gzipped method to get a file-like object and parse JSON objects one by one. with self.stream_gzipped( url, chunksize=chunksize, httpx_client=client, decode=True, **httpx_kwargs, ) as gzipped_stream: for entry in ijson.kvitems(gzipped_stream, ""): yield entry else: raise ValueError(f"Unsupported file type for URL: {url}")
[docs] def stream_tree(self, url: str, **skbio_kwargs) -> Generator: """ Streams a tree file from a url and returns an iterator of parsed tree records. Parameters ---------- url : str The url of the tree file to stream. skbio_kwargs : dict, optional Additional keyword arguments to pass to the skbio parser. Returns ------- Generator[dict, None, None] An iterator of parsed tree records as dictionaries. """ return read(url, format="newick", **skbio_kwargs)
def _fix_inconsistent_cols( self, fields: list[str], pad_to: int = 15 ) -> list[str] | None: """ Fixes inconsistent columns in a list of strings. Parameters ---------- fields : list[str] The list of strings to fix. pad_to : int, optional The number of columns to pad or truncate to. Default is 15. Returns ------- list[str] | None The fixed list of strings, or None if the fields are invalid. """ # pad to if len(fields) < pad_to: return fields + [""] * (pad_to - len(fields)) # truncate if len(fields) > pad_to: return fields[:pad_to] return fields def _get_streamer( self, alias: Optional[str] = None, url: Optional[HttpUrl] = None, chunksize: int = 1000, httpx_client: Optional[httpx.Client] = None, max_skip: int = 5, **kwargs, ): """ Gets the appropriate streamer function based on the file type of the download. Parameters ---------- alias : Optional[str] The alias of the download to stream. url : Optional[HttpUrl] The url of the download to stream. chunksize : int, optional The size of the chunks to read from the stream. Default is 1000. httpx_client : Optional[httpx.Client], optional The httpx client to use for making requests. Default is None. max_skip : int, optional The maximum number of rows to skip before raising an error. Default is 5. kwargs : dict Additional keyword arguments to pass to the streamer function. Returns ------- Callable A function that can be called to stream the download. """ client = httpx_client or self._mgnifier_helper.httpx_client # get alias/url _alias, _url = self._prioritize_alias(alias, url, required=True) # return stream based on file type file_type = self._get_type_by_alias(_alias) if file_type == "tsv": if _url.endswith(".gz") or _url.endswith(".gzip"): logging.debug(f"tsv file type ends with .gz: {_url}") try: return self.stream_tsv( _url, chunksize=chunksize, max_skip=max_skip, compression="gzip", **kwargs, ) except pd.errors.ParserError as e: logging.error(f"ParserError: {e}") return self.stream_tsv( _url, chunksize=chunksize, max_skip=max_skip, compression="gzip", engine="python", on_bad_lines=self._fix_inconsistent_cols, **kwargs, ) elif _url.endswith(".txt") or _url.endswith(".tsv"): return self.stream_tsv( _url, chunksize=chunksize, max_skip=max_skip, **kwargs ) elif file_type == "csv": return self.stream_tsv( _url, sep=",", chunksize=chunksize, max_skip=max_skip, **kwargs ) elif file_type == "html": return lambda: self.stream_html(_url, **kwargs) elif file_type == "txt": # TODO: to constants return self.stream_txt( _url, chunksize=chunksize, httpx_client=client, **kwargs ) elif file_type == "gff": return self.stream_gff(_url, **kwargs) elif file_type == "biom": return self.stream_biom(_url, **kwargs) elif file_type == "fasta": return self.stream_fasta(_url, **kwargs) elif file_type == "tree": return self.stream_tree(_url, **kwargs) elif file_type == "json": return self.stream_jsonl( _url, orient="records", chunksize=chunksize, **kwargs ) elif file_type == "other" and ".json" in _url: if _url.endswith("json.gz") or _url.endswith("json.gzip"): return self.stream_json( _url, chunksize=chunksize, httpx_client=client, **kwargs ) logging.info( f"{_alias} is only available for download " f"(e.g., `.download({_alias}))`" ) # more info logging.debug( f"Alias: {_alias}\nURL: {_url}\nFile type: {file_type}. " "Only '.json' files can be streamed under 'other' type, " "otherwise this download is only available for download." ) else: raise ValueError(f"Unsupported file type for streaming: {file_type}")
[docs] def stream( self, *, alias: Optional[str] = None, url: Optional[HttpUrl] = None, chunksize: Optional[int] = None, max_skip: int = 5, **kwargs, ) -> dict[str, Callable]: """ Streams a download based on its alias or url. If neither alias nor url is provided, streams all downloads. (if chunksize is specified, it's kinda lazy loading) Parameters ---------- alias : Optional[str] The alias of the download to stream. url : Optional[HttpUrl] The url of the download to stream. chunksize : Optional[int] The size of the chunks to read from the stream. max_skip : int, optional The maximum number of rows to skip before raising an error. Default is 5. **kwargs Additional keyword arguments to pass to the streamer function. Returns ------- dict[str, Callable] A dictionary of alias: streamer_function for the requested downloads. """ # get alias/url _alias, _url = self._prioritize_alias(alias, url) # if neither alias nor url provided, stream all downloads if not _alias and not _url: aliases = self.downloads_df["alias"].tolist() else: aliases = [_alias] # return dict of alias: streamer_function client = self._mgnifier_helper.httpx_client # TODO: skip404 client error for now streams = {} for a in aliases: try: logging.info(f"Setting up stream for alias: {a}") streams[a] = self._get_streamer( alias=a, chunksize=chunksize, httpx_client=client, max_skip=max_skip, **kwargs, ) except HTTPError as err: logging.error(f"HTTP error for alias {a} and url {_url}: {err}") continue # skip this stream but continue with others return streams
[docs] def download( self, to_dir: DirectoryPath, alias: Optional[str] = None, *, url: Optional[str] = None, filename: Optional[str] = None, httpx_client: Optional[httpx.Client] = None, hide_progress: bool = False, ): """ Downloads a file from a url or alias to a specified directory. Parameters ---------- to_dir : DirectoryPath The directory to download the file to. alias : Optional[str], optional The alias of the file to download. If not provided, `url` must be provided. Default is None. url : Optional[str], optional The url of the file to download. If not provided, `alias` must be provided. Default is None. filename : Optional[str], optional The name to save the file as. If not provided, the alias will be used as the filename. Default is None. Raises ------ ValueError If neither `alias` nor `url` is provided, or if `url` is provided without a corresponding `alias` in the downloads. """ # get alias/url _alias, _url = self._prioritize_alias(alias, url, required=True) # if no alias then need filename if not _alias and not filename: raise ValueError( "If `url` not from downloads, `filename` must be provided since no alias available." ) # make dir if not exists to_dir = Path(to_dir) to_dir.mkdir(parents=True, exist_ok=True) # prep full path filepath = to_dir / filename if filename else to_dir / _alias # reuse httpx client if provided, otherwise init new client using mgnifier with httpx_client or self._mgnifier_helper.httpx_client as client: with client.stream("GET", _url) as response: response.raise_for_status() total = int(response.headers.get("content-length", 0)) with ( open(filepath, "wb") as f, tqdm_sync( total=total, unit="B", unit_scale=True, desc=f"Downloading {filename or _alias}", ascii=" >=", disable=hide_progress, ) as pbar, ): for chunk in response.iter_bytes(): f.write(chunk) pbar.update(len(chunk))
[docs] async def adownload( self, to_dir: DirectoryPath, alias: Optional[str] = None, *, url: Optional[str] = None, filename: Optional[str] = None, httpx_aclient: Optional[httpx.AsyncClient] = None, hide_progress: bool = False, ): """ Asynchronously downloads a file from a url or alias to a specified directory. Parameters ---------- to_dir : DirectoryPath The directory to download the file to. alias : Optional[str], optional The alias of the file to download. If not provided, `url` must be provided. Default is None. url : Optional[str], optional The url of the file to download. If not provided, `alias` must be provided. Default is None. filename : Optional[str], optional The name to save the file as. If not provided, the alias will be used as the filename. Default is None. Note that if `url` is provided without a corresponding `alias` in the downloads, `filename` must be provided since there is no alias to use as the filename. httpx_aclient : Optional[httpx.AsyncClient], optional An optional httpx.AsyncClient to use for the download. If not provided, a new client will be created using the mgnifier helper. Default is None. """ # get alias/url _alias, _url = self._prioritize_alias(alias, url, required=True) # if no alias then need filename if not _alias and not filename: raise ValueError( "If `url` not from downloads, `filename` must be provided since no alias available." ) # make dir if not exists to_dir = Path(to_dir) to_dir.mkdir(parents=True, exist_ok=True) # prep full path filepath = to_dir / filename if filename else to_dir / _alias # arg TODO mixins # semaphore to limit concurrent downloads, can be adjusted in config async with semaphore: if httpx_aclient is not None: client = httpx_aclient # Do NOT use 'async with' here, just use the client directly async with client.stream("GET", _url) as response: response.raise_for_status() total = int(response.headers.get("content-length", 0)) with tqdm_sync( total=total, unit="B", unit_scale=True, desc=f"Downloading {filename or _alias}", ascii="░▒█", disable=hide_progress, ) as pbar: async with aiofiles.open(filepath, "wb") as f: async for chunk in response.aiter_bytes(): await f.write(chunk) pbar.update(len(chunk)) else: async with self._mgnifier_helper.httpx_aclient as client: async with client.stream("GET", _url) as response: response.raise_for_status() total = int(response.headers.get("content-length", 0)) with tqdm_sync( total=total, unit="B", unit_scale=True, desc=f"Downloading {filename or _alias}", ascii="░▒█", disable=hide_progress, ) as pbar: async with aiofiles.open(filepath, "wb") as f: async for chunk in response.aiter_bytes(): await f.write(chunk) pbar.update(len(chunk))
[docs] async def adownload_all( self, to_dir: DirectoryPath, hide_progress: bool = False, ): """ Asynchronously downloads all files in the downloads to a specified directory. Parameters ---------- to_dir : DirectoryPath The directory to download the files to. hide_progress : bool, optional Whether to hide the progress bars. Default is False. Note ---- This method will use the `adownload` method for each file, so it will respect the same parameters and behavior for handling aliases, urls, filenames, and httpx clients. If you want to customize those parameters for each file, you can call `adownload` directly for each file instead of using this method. """ logging.debug("Initializing async client once for all downloads") async with self._mgnifier_helper.httpx_aclient as client: # create tasks for each download tasks = [ self.adownload( to_dir=to_dir, alias=a, httpx_aclient=client, hide_progress=hide_progress, ) for a in self.url_dict ] # Overall progress bar for f in tqdm_async( asyncio.as_completed(tasks), total=len(tasks), desc="Overall Progress", ascii=" >=", disable=hide_progress, ): try: await f except httpx.ConnectError as ce: # flag and continue with downloads logging.error( f"Connection error occurred while downloading {f}: {ce}" ) except Exception as e: # flag and continue with downloads .. logging.error(f"Error occurred while downloading {f}: {e}")
[docs] def download_all( self, to_dir: DirectoryPath, hide_progress: bool = False, ): """ TODO fix Downloads all files in the downloads to a specified directory. Parameters ---------- to_dir : DirectoryPath The directory to download the files to. hide_progress : bool, optional Whether to hide the progress bars. Default is False. Note ---- This method will use the `download` method for each file, so it will respect the same parameters and behavior for handling aliases, urls, filenames, and httpx clients. If you want to customize those parameters for each file, you can call `download` directly for each file instead of using this method. """ logging.debug("Initializing client once for all downloads") with self._mgnifier_helper.httpx_client as client: aliases = list(self.url_dict.keys()) for alias in tqdm_sync( aliases, total=len(aliases), desc="Overall Progress", ascii=" >=", disable=hide_progress, ): try: self.download( to_dir=to_dir, alias=alias, httpx_client=client, hide_progress=hide_progress, ) except httpx.ConnectError as ce: logging.error( "Connection error occurred while downloading %s: %s", alias, ce, ) except Exception as e: logging.error( "Error occurred while downloading %s: %s", alias, e, )
[docs] class MGazineCurator: def __init__( self, *mgazines, ): if mgazines and all(isinstance(m, MGazine) for m in mgazines): self.mgazines = mgazines elif mgazines and all(isinstance(m, str) for m in mgazines): self.mgazines = [MGazine(m) for m in mgazines] else: raise ValueError( "Invalid input: all inputs must be either MGazine instances or accession strings." )
[docs] def go_terms(self): pass
# class DatasetBuilder(MGnifier): # def __init__( # self, # accession: str, # ): # super().__init__( # accession=accession, # ) # self.mpy_module = analysis_get_mgnify_analysis_with_annotations # def __getitem__(self, key): # pass # def __getattr__(self, name): # if name == "annotations": # return self # else: # raise KeyError(f"DatasetBuilder object has no attribute {name}") # def export(self): # pass # should there be different dataset builders? # and they can be added to dataset builder as attributes?