import logging
import requests
import pydantic
from typing import Any, Dict, Iterator, List, Optional, Union
from functools import partialmethod
from .exceptions import raise_from_error_response
V1 = "v1"
V2 = "v2"
VERSIONS = {V1: "v1", V2: "api/v2"}
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# ApiResponse.data type
T_DATA = Union[Dict[str, Any], List[Union[int, str]], List[Dict[str, Any]]]
T_ADDITIONAL_DATA = Union[Dict[str, Any], List[Union[int, str]], List[Dict[str, Any]]]
T_RELATED_OBJECTS = Union[Dict[str, Any], List[Dict[str, Any]]]
class ApiResponse(pydantic.BaseModel):
success: Optional[bool] = None
data: Optional[T_DATA] = None
additional_data: Optional[T_ADDITIONAL_DATA] = {}
related_objects: Optional[T_RELATED_OBJECTS] = {}
def to_dict(self):
return self.model_dump(include={"data", "related_objects"})
[docs]class Api:
"""
Pipedrive API client for https://pipedrive.readme.io/docs/
documentation.
Supports the Piperive API V1 and V2 versions. By default, the ``Api``
instance is initialized with the V2 version. When the Model doesn't
support it, it fallbacks to the V1 version. However, it is possible to
force the usage of V1 version:
>>> from pypipedrive.api import Api, V1
>>> Api(version=V1)
"""
def __init__(self, api_token: str = None, version=V2) -> None:
"""
Initialize the API client with the given version.
Args:
api_token: Pipedrive API token. If not provided, it will be read
from the ``PIPEDRIVE_API_TOKEN`` environment variable.
version: API version. Defaults to V2.
"""
if version not in VERSIONS:
raise ValueError(f"Invalid version: {version} (expected {list(VERSIONS.keys())})")
if api_token is None:
raise ValueError("`api_token` must be provided")
self.session = requests.Session()
self.api_token = api_token
self.version = VERSIONS[version]
self.endpoint_url = f"https://api.pipedrive.com/{self.version}/"
@property
def api_token(self) -> str:
"""
Pipedrive API token used in all calls.
"""
return self._api_token
@api_token.setter
def api_token(self, value: str) -> None:
self.session.params.update({"api_token": value})
self._api_token = value
def __repr__(self) -> str:
return f"<pypipedrive.{self.__class__.__name__} version={self.version}>"
[docs] def update_method(self) -> partialmethod:
"""
Determine the appropriate HTTP method for updating resources based
on the API version.
Returns:
The appropriate partialmethod for updating resources.
"""
if self.version == VERSIONS[V2]:
return self.patch
else:
return self.put
[docs] def build_url(self, uri: str) -> str:
"""
Build the full URL for the given endpoint parts.
Args:
uri: The endpoint URI.
Returns:
Full URL
"""
return self.endpoint_url + uri
[docs] def process_response(self, response: requests.Response) -> ApiResponse:
"""
Process the HTTP response from the Pipedrive API.
Args:
response: The HTTP response object.
Returns:
The processed response data.
Raises:
Appropriate exceptions based on the response status code.
"""
payload = {} # Set the payload variable
try:
payload = response.json()
except requests.exceptions.JSONDecodeError: # Response not JSON
payload = {"data": {"content": response.content}}
except Exception as exc:
raise requests.exceptions.HTTPError(
f"API request failed. Status code: {response.status_code}. "
f"Reason: {response.reason}. Response content: {response.text}. "
f"Exception: {exc}"
)
# Return ApiResponse or raise exception
if response.ok:
return ApiResponse(**payload)
else:
raise_from_error_response(
code=response.status_code,
version=self.version,
error_response=payload
)
[docs] def request(
self,
method: str,
uri: str,
headers: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Any]] = None) -> ApiResponse:
"""
Make a request to the Pipedrive API.
Args:
method: HTTP method to use.
uri: The URI we're attempting to call.
params: Additional query params to append to the URL as-is.
json: The JSON payload for a POST/PUT/PATCH/DELETE request.
files: Files to upload.
"""
response = self.session.request(
method=method,
url=self.build_url(uri),
headers=headers,
params={**(params or {})},
json=json,
data=data,
files=files,
)
logger.info(msg=f"{method}:{response.status_code} {uri}")
return self.process_response(response)
# By using partialmethod, we avoid repeating the "GET", "PUT", "POST",
# "PATCH" and "DELETE" strings each time the request is made (code cleaner)
get = partialmethod(request, "GET")
put = partialmethod(request, "PUT") # V1 endpoints only
post = partialmethod(request, "POST")
patch = partialmethod(request, "PATCH") # V2 endpoints only
delete = partialmethod(request, "DELETE")
[docs] def batch_delete(
self,
uri: str = None,
ids: List[Union[int, str]] = []) -> ApiResponse:
"""
Make a batch DELETE request to the Pipedrive API using `ids`.
Args:
uri: API endpoint
ids: List of IDs to delete.
Returns:
The API response.
"""
assert uri not in [None, ""], "`uri` must be provided."
assert ids is not None, "`ids` cannot be None."
assert isinstance(ids, list), "`ids` must be a list of integers or strings."
assert all(isinstance(x, (int, str)) for x in ids), \
"`ids` must be a list of integers or strings."
return self.delete(uri=uri, params={"ids": ",".join(map(str, ids))})
[docs] def iterator(
self,
uri: str = None,
params: Optional[Dict[str, Any]] = None) -> Iterator[ApiResponse]:
"""
Yield API responses for a paginated endpoint.
Args:
uri: Endpoint URI to call.
params: Initial query parameters (copied internally).
Yields:
Each `requests.Response` returned by the API until pagination ends.
"""
if uri is None:
raise ValueError("`uri` must be provided")
qparams: Dict[str, Any] = dict(params or {})
while True:
response: ApiResponse = self.get(uri=uri, params=qparams)
if not response.success:
break
yield response
# Safe guard: if no additional_data, stop pagination
if not response.additional_data:
break
pagination = response.additional_data.get("pagination") or {}
# V2 cursor-based pagination
next_cursor = response.additional_data.get("next_cursor") or {}
if next_cursor:
qparams["cursor"] = next_cursor
qparams.pop("start", None)
continue
# V1 start/limit pagination
more = pagination.get("more_items_in_collection")
if more:
used_limit = qparams.get("limit") or pagination.get("limit") or 100
try:
start = int(qparams.get("start", 0))
except Exception:
start = 0
qparams["start"] = start + int(used_limit)
continue
break # No more pages
[docs] def all(
self,
uri: str,
params: Optional[Dict[str, Any]] = None) -> ApiResponse:
"""
Make a GET request to retrieve all items from a paginated endpoint.
Args:
uri: Endpoint URI to call.
params: Initial query parameters (copied internally).
Returns:
A list of all items retrieved from the paginated endpoint.
"""
data: Union[T_DATA] = []
success: bool = True
related_objects: List[T_RELATED_OBJECTS] = []
for response in self.iterator(uri=uri, params=params):
success = success and response.success
# Merge data, additional_data and related_objects for processing
values = (
("data", response.data, data),
("related_objects", response.related_objects, related_objects),
)
for _, value, items in values:
if value in [None, "", [], {}]:
continue
elif isinstance(value, list):
items.extend(value)
else:
items.append(value)
return ApiResponse(
success=success,
data=data,
related_objects=related_objects if related_objects else None
)