Source code for pypipedrive.models.pipelines

from datetime import date
from typing import Dict, Union
from pypipedrive.api import V1, V2
from pypipedrive.utils import warn_endpoint_legacy
from pypipedrive.orm.model import Model
from pypipedrive.orm import fields as F


[docs]class Pipelines(Model): """ Pipelines are essentially ordered collections of stages. See `Pipelines API reference <https://developers.pipedrive.com/docs/api/v1/Pipelines>`_. Get all pipelines. * GET[Cost:10] ``v1/pipelines`` **DEPRECATED** * GET[Cost:5] ``v2/pipelines`` Get one pipeline. * GET[Cost:2] ``v1/pipelines/{id}`` **DEPRECATED** * GET[Cost:1] ``v2/pipelines/{id}`` Get deals conversion rates in pipeline. * GET[Cost:40] ``v1/pipelines/{id}/conversion_statistics`` Get deals in a pipeline. * GET[Cost:20] ``v1/pipelines/{id}/deals`` **DEPRECATED** Get deals movements in pipeline. * GET[Cost:40] ``v1/pipelines/{id}/movement_statistics`` Add a new pipeline. * POST[Cost:10] ``v1/pipelines`` **DEPRECATED** * POST[Cost:5] ``v2/pipelines`` Update a pipeline. * PUT[Cost:10] ``v1/pipelines/{id}`` **DEPRECATED** * PATCH[Cost:5] ``v2/pipelines/{id}`` Delete a pipeline. * DELETE[Cost:6] ``v1/pipelines/{id}`` **DEPRECATED** * DELETE[Cost:3] ``v2/pipelines/{id}`` """ id = F.IntegerField("id", readonly=True) name = F.TextField("name") order_nr = F.IntegerField("order_nr") is_deleted = F.BooleanField("is_deleted", readonly=True) is_deal_propability_enabled = F.BooleanField("is_deal_probability_enabled") add_time = F.DatetimeField("add_time", readonly=True) update_time = F.DatetimeField("update_time", readonly=True) is_selected = F.BooleanField("is_selected", readonly=True) class Meta: entity_name = "pipelines" version = V2
[docs] @classmethod def batch_delete(cls, *args, **kwargs): raise NotImplementedError("Pipelines.batch_delete() is not allowed.")
@warn_endpoint_legacy def _stastistics( self, stats_type: str, start_date: Union[str, date], end_date: Union[str, date], user_id: int = None) -> Dict: """ Helper method for conversion and movements statistics. Args: start_date: Start date in format YYYY-MM-DD. end_date: End date in format YYYY-MM-DD. user_id: Optional user ID to filter statistics. Returns: A dictionary with statistics. """ assert stats_type in ["conversion", "movement"], \ "`stats_type` must be 'conversion' or 'movement'" assert isinstance(start_date, (str, date)), "`start_date` must be str or date" assert isinstance(end_date, (str, date)), "`end_date` must be str or date" if isinstance(start_date, date): start_date = start_date.isoformat() if isinstance(end_date, date): end_date = end_date.isoformat() params = {"start_date": start_date, "end_date": end_date} if user_id is not None: params.update({"user_id": user_id}) uri = f"{self._get_meta('entity_name')}/{self.id}/{stats_type}_statistics" return self.get_api(version=V1).get(uri=uri, params=params).to_dict()
[docs] @warn_endpoint_legacy def conversion_statistics( self, start_date: Union[str, date], end_date: Union[str, date], user_id: int = None) -> Dict: """ Returns all stage-to-stage conversion and pipeline-to-close rates for the given time period. Args: start_date: Start date in format YYYY-MM-DD. end_date: End date in format YYYY-MM-DD. user_id: Optional user ID to filter statistics. Returns: A dictionary with conversion statistics. """ return self._stastistics("conversion", start_date, end_date, user_id)
[docs] def movement_statistics( self, start_date: Union[str, date], end_date: Union[str, date], user_id: int = None) -> Dict: """ Returns statistics for deals movements for the given time period. Args: start_date: Start date in format YYYY-MM-DD. end_date: End date in format YYYY-MM-DD. user_id: Optional user ID to filter statistics. Returns: A dictionary with movement statistics. """ return self._stastistics("movement", start_date, end_date, user_id)