Source code for qianfan.resources.http_client

# Copyright (c) 2023 Baidu, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, AsyncIterator, Iterator, Tuple

import aiohttp
import requests

from qianfan import get_config
from qianfan.resources.typing import QfRequest


[docs]class HTTPClient(object): """ object used to make http request """ def __init__(self, ssl: bool = True, **kwargs: Any) -> None: """ init sync and async request session Args: ssl (bool): whether to use ssl verification in connection, default to True **kwargs (Any): arbitrary arguments """ cfg = get_config() if not ssl or not cfg.SSL_VERIFICATION_ENABLED: self.ssl = False else: self.ssl = True self._session = requests.session()
[docs] def request(self, req: QfRequest) -> requests.Response: """ sync request """ resp = self._session.request( **req.requests_args(), timeout=req.retry_config.timeout, verify=self.ssl, ) return resp
[docs] def request_stream( self, req: QfRequest ) -> Iterator[Tuple[bytes, requests.Response]]: """ sync stream request """ resp = self._session.request( **req.requests_args(), stream=True, timeout=req.retry_config.timeout, verify=self.ssl, ) for line in resp.iter_lines(): yield line, resp
[docs] async def arequest( self, req: QfRequest ) -> Tuple[aiohttp.ClientResponse, aiohttp.ClientSession]: """ async request """ session = aiohttp.ClientSession() timeout = aiohttp.ClientTimeout(total=req.retry_config.timeout) response = await session.request( **req.requests_args(), timeout=timeout, ssl=self.ssl ) return response, session
[docs] async def arequest_stream( self, req: QfRequest ) -> AsyncIterator[Tuple[bytes, aiohttp.ClientResponse]]: """ async stream request """ async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=req.retry_config.timeout) async with session.request( **req.requests_args(), timeout=timeout, ssl=self.ssl ) as resp: async for line in resp.content: yield line, resp