Source code for mgnipy.emgapi_v2_client.api.genomes.genome_fragment_search

from http import HTTPStatus
from typing import Any

import httpx

from ...client import AuthenticatedClient, Client
from ...types import Response
from ... import errors

from ...models.genome_fragment_search_out import GenomeFragmentSearchOut


def _get_kwargs() -> dict[str, Any]:

    _kwargs: dict[str, Any] = {
        "method": "post",
        "url": "/metagenomics/api/v2/genome-search/",
    }

    return _kwargs


def _parse_response(
    *, client: AuthenticatedClient | Client, response: httpx.Response
) -> GenomeFragmentSearchOut | None:
    if response.status_code == 200:
        response_200 = GenomeFragmentSearchOut.from_dict(response.json())

        return response_200

    if client.raise_on_unexpected_status:
        raise errors.UnexpectedStatus(response.status_code, response.content)
    else:
        return None


def _build_response(
    *, client: AuthenticatedClient | Client, response: httpx.Response
) -> Response[GenomeFragmentSearchOut]:
    return Response(
        status_code=HTTPStatus(response.status_code),
        content=response.content,
        headers=response.headers,
        parsed=_parse_response(client=client, response=response),
    )


[docs] def sync_detailed( *, client: AuthenticatedClient | Client, ) -> Response[GenomeFragmentSearchOut]: """Search genomes by short sequence and annotate with MGnify metadata Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. httpx.TimeoutException: If the request takes longer than Client.timeout. Returns: Response[GenomeFragmentSearchOut] """ kwargs = _get_kwargs() response = client.get_httpx_client().request( **kwargs, ) return _build_response(client=client, response=response)
[docs] def sync( *, client: AuthenticatedClient | Client, ) -> GenomeFragmentSearchOut | None: """Search genomes by short sequence and annotate with MGnify metadata Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. httpx.TimeoutException: If the request takes longer than Client.timeout. Returns: GenomeFragmentSearchOut """ return sync_detailed( client=client, ).parsed
[docs] async def asyncio_detailed( *, client: AuthenticatedClient | Client, ) -> Response[GenomeFragmentSearchOut]: """Search genomes by short sequence and annotate with MGnify metadata Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. httpx.TimeoutException: If the request takes longer than Client.timeout. Returns: Response[GenomeFragmentSearchOut] """ kwargs = _get_kwargs() response = await client.get_async_httpx_client().request(**kwargs) return _build_response(client=client, response=response)
[docs] async def asyncio( *, client: AuthenticatedClient | Client, ) -> GenomeFragmentSearchOut | None: """Search genomes by short sequence and annotate with MGnify metadata Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. httpx.TimeoutException: If the request takes longer than Client.timeout. Returns: GenomeFragmentSearchOut """ return ( await asyncio_detailed( client=client, ) ).parsed