Skip to content

MOR API Client

Bases: BaseAPIClient

Master of Reference (MOR) API Client.

It allows interaction with the MOR system to retrieve information about assets, service events, software, replaced parts, offerings, and DIB.

So far only the offerings search interface is implemented.

To get authorization secrets contact MOR team via a questionary https://forms.office.com/e/NiLxtHSd4z?origin=lprLink or contact Jonas Fritschy personally.

Source code in reportconnectors/api_client/mor/__init__.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class MorAPIClient(BaseAPIClient):
    """
    Master of Reference (MOR) API Client.

    It allows interaction with the MOR system to retrieve information about assets, service events,
    software, replaced parts, offerings, and DIB.

    So far only the offerings search interface is implemented.

    To get authorization secrets contact MOR team via a questionary
    https://forms.office.com/e/NiLxtHSd4z?origin=lprLink
    or contact Jonas Fritschy personally.
    """

    # Set minimum TLS version to TLS 1.2 for secure communication,
    # And set SSL ciphers for compatibility with servers requiring specific security levels.
    # This is required for MOR API communication.
    # I have spent a lot of time debugging TLS issues and this configuration solved them.
    _minimum_tls_version = TLSVersion.TLSv1_2
    _ssl_ciphers = "DEFAULT@SECLEVEL=1"

    @property
    def is_logged(self):
        """
        Checks if the client is logged in.
        """
        time_now = datetime.datetime.now(datetime.timezone.utc)
        _is_logged = (
            bool(self.auth_token)
            and isinstance(self.token_expiration_date, datetime.datetime)
            and (self.token_expiration_date > time_now)
        )
        return _is_logged

    def authenticate(self, client_id: str, client_secret: str, **kwargs) -> bool:
        """
        Authenticates the client using OAuth2 Client Credentials Grant flow.

        It uses the provided client ID and client secret to obtain an access token from the
        authorization server using Basic Auth.

        Client ID and Client Secret can be obtained by contacting MOR team.

        If the authentication is successful, the access token and related information are stored
        in the client's state for future requests. This way the client can make authenticated requests
        to the MOR API.

        Args:
            client_id: Client ID for authentication.
            client_secret: Client Secret for authentication.

        Keyword Args:
            timeout (float): Timeout for the authentication request in seconds. Default is 60.0 seconds.
            content_type (str): Content type for the authentication request. Default is "application/x-www-form-urlencoded".
            token_url (str): URL for the token endpoint. Default is constructed from the base URL.
            grant_type (str): Grant type for the authentication request. Default is "client_credentials".

        Returns:
            True if authentication is successful, otherwise False.
        """

        timeout: float = kwargs.get("timeout", 60.0)
        content_type: str = kwargs.get("content_type", "application/x-www-form-urlencoded")
        token_url: str = kwargs.get("token_url", self._join_url(self.url, "oauth/token"))
        grant_type: str = kwargs.get("grant_type", "client_credentials")

        try:
            response = self._make_request(
                method="POST",
                url=token_url,
                content_type=content_type,
                data={"grant_type": grant_type},
                auth=(client_id, client_secret),
                timeout=timeout,
            )
            response_model = self._decode_response_to_model(response, MorOAuth2TokenResponse)
        except (RequestException, ValueError) as e:
            log.error("Authentication failed.", exc_info=e)
            return False

        self.set_token(
            access_token=response_model.access_token,
            refresh_token=response_model.refresh_token,
            id_token=response_model.id_token,
            expiration_date=response_model.expires_at,
        )
        return self.is_logged

    def set_token(self, access_token: str, **kwargs) -> bool:
        """
        Sets the authorization tokens without the authentication process. It might be used when the
        tokens are obtained from external source.

        Args:
            access_token: Access token to be set.

        Keyword Args:
            refresh_token (str): Refresh token to be set.
            id_token (str): Refresh token to be set.
            client_id (str): Client ID to be set. If not provided, it will be extracted from the token's payload.
            expiration_date (datetime.datetime): Expiration date of the access token. If not provided,
                it will be extracted from the token's payload.

        Returns:
            True if provided token is set and valid. Otherwise, False.
        """
        refresh_token: Optional[str] = kwargs.pop("refresh_token", None)
        id_token: Optional[str] = kwargs.pop("id_token", None)
        client_id: Optional[str] = kwargs.pop("client_id", None)
        expiration_date: Optional[datetime.datetime] = kwargs.pop("expiration_date", None)

        if isinstance(expiration_date, datetime.datetime) and expiration_date < datetime.datetime.now(
            datetime.timezone.utc
        ):
            log.error(f"Token is expired. {expiration_date=}")
        # Set new tokens ...
        self.auth_data[self.KeyNames.ACCESS_TOKEN] = access_token
        self.auth_data[self.KeyNames.EXPIRATION_DATE] = expiration_date
        self.auth_data[self.KeyNames.CLIENT_ID] = client_id
        if refresh_token:
            self.auth_data[self.KeyNames.REFRESH_TOKEN] = refresh_token
        if id_token:
            self.auth_data[self.KeyNames.ID_TOKEN] = id_token
        return True

    def get_preventive_maintenance_offerings(self, serial_number: Union[str, List[str]]) -> OfferingSearchResponse:
        """
        Retrieves preventive maintenance offerings for the given converter serial number(s).

        Args:
            serial_number: A single converter serial number as a string or a list of serial numbers.

        Returns:
            An OfferingSearchResponse object containing the preventive maintenance offerings.
        """
        endpoint = self._build_endpoint_url(interface_name="offerings", endpoint_name="search")
        serial_numbers = [serial_number] if isinstance(serial_number, str) else serial_number
        filters = {
            "filters": [
                {"property": "converterSerialNumber", "values": serial_numbers},
                {"property": "type.name", "values": ["Preventive Maintenance"]},
            ]
        }
        response = self._make_request(method="POST", endpoint=endpoint, json_data=filters)
        offering = self._decode_response_to_model(response=response, model_type=OfferingSearchResponse)
        return offering

    @staticmethod
    def _build_endpoint_url(interface_name: MorInterfaceName, endpoint_name: str) -> str:
        """
        Builds the MOR endpoint. It provides a path to one chosen endpoint.
        """
        return f"api/v1/{interface_name}/{endpoint_name}"

is_logged property

Checks if the client is logged in.

authenticate(client_id, client_secret, **kwargs)

Authenticates the client using OAuth2 Client Credentials Grant flow.

It uses the provided client ID and client secret to obtain an access token from the authorization server using Basic Auth.

Client ID and Client Secret can be obtained by contacting MOR team.

If the authentication is successful, the access token and related information are stored in the client's state for future requests. This way the client can make authenticated requests to the MOR API.

Parameters:

Name Type Description Default
client_id str

Client ID for authentication.

required
client_secret str

Client Secret for authentication.

required

Other Parameters:

Name Type Description
timeout float

Timeout for the authentication request in seconds. Default is 60.0 seconds.

content_type str

Content type for the authentication request. Default is "application/x-www-form-urlencoded".

token_url str

URL for the token endpoint. Default is constructed from the base URL.

grant_type str

Grant type for the authentication request. Default is "client_credentials".

Returns:

Type Description
bool

True if authentication is successful, otherwise False.

Source code in reportconnectors/api_client/mor/__init__.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def authenticate(self, client_id: str, client_secret: str, **kwargs) -> bool:
    """
    Authenticates the client using OAuth2 Client Credentials Grant flow.

    It uses the provided client ID and client secret to obtain an access token from the
    authorization server using Basic Auth.

    Client ID and Client Secret can be obtained by contacting MOR team.

    If the authentication is successful, the access token and related information are stored
    in the client's state for future requests. This way the client can make authenticated requests
    to the MOR API.

    Args:
        client_id: Client ID for authentication.
        client_secret: Client Secret for authentication.

    Keyword Args:
        timeout (float): Timeout for the authentication request in seconds. Default is 60.0 seconds.
        content_type (str): Content type for the authentication request. Default is "application/x-www-form-urlencoded".
        token_url (str): URL for the token endpoint. Default is constructed from the base URL.
        grant_type (str): Grant type for the authentication request. Default is "client_credentials".

    Returns:
        True if authentication is successful, otherwise False.
    """

    timeout: float = kwargs.get("timeout", 60.0)
    content_type: str = kwargs.get("content_type", "application/x-www-form-urlencoded")
    token_url: str = kwargs.get("token_url", self._join_url(self.url, "oauth/token"))
    grant_type: str = kwargs.get("grant_type", "client_credentials")

    try:
        response = self._make_request(
            method="POST",
            url=token_url,
            content_type=content_type,
            data={"grant_type": grant_type},
            auth=(client_id, client_secret),
            timeout=timeout,
        )
        response_model = self._decode_response_to_model(response, MorOAuth2TokenResponse)
    except (RequestException, ValueError) as e:
        log.error("Authentication failed.", exc_info=e)
        return False

    self.set_token(
        access_token=response_model.access_token,
        refresh_token=response_model.refresh_token,
        id_token=response_model.id_token,
        expiration_date=response_model.expires_at,
    )
    return self.is_logged

get_preventive_maintenance_offerings(serial_number)

Retrieves preventive maintenance offerings for the given converter serial number(s).

Parameters:

Name Type Description Default
serial_number Union[str, List[str]]

A single converter serial number as a string or a list of serial numbers.

required

Returns:

Type Description
OfferingSearchResponse

An OfferingSearchResponse object containing the preventive maintenance offerings.

Source code in reportconnectors/api_client/mor/__init__.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def get_preventive_maintenance_offerings(self, serial_number: Union[str, List[str]]) -> OfferingSearchResponse:
    """
    Retrieves preventive maintenance offerings for the given converter serial number(s).

    Args:
        serial_number: A single converter serial number as a string or a list of serial numbers.

    Returns:
        An OfferingSearchResponse object containing the preventive maintenance offerings.
    """
    endpoint = self._build_endpoint_url(interface_name="offerings", endpoint_name="search")
    serial_numbers = [serial_number] if isinstance(serial_number, str) else serial_number
    filters = {
        "filters": [
            {"property": "converterSerialNumber", "values": serial_numbers},
            {"property": "type.name", "values": ["Preventive Maintenance"]},
        ]
    }
    response = self._make_request(method="POST", endpoint=endpoint, json_data=filters)
    offering = self._decode_response_to_model(response=response, model_type=OfferingSearchResponse)
    return offering

set_token(access_token, **kwargs)

Sets the authorization tokens without the authentication process. It might be used when the tokens are obtained from external source.

Parameters:

Name Type Description Default
access_token str

Access token to be set.

required

Other Parameters:

Name Type Description
refresh_token str

Refresh token to be set.

id_token str

Refresh token to be set.

client_id str

Client ID to be set. If not provided, it will be extracted from the token's payload.

expiration_date datetime

Expiration date of the access token. If not provided, it will be extracted from the token's payload.

Returns:

Type Description
bool

True if provided token is set and valid. Otherwise, False.

Source code in reportconnectors/api_client/mor/__init__.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def set_token(self, access_token: str, **kwargs) -> bool:
    """
    Sets the authorization tokens without the authentication process. It might be used when the
    tokens are obtained from external source.

    Args:
        access_token: Access token to be set.

    Keyword Args:
        refresh_token (str): Refresh token to be set.
        id_token (str): Refresh token to be set.
        client_id (str): Client ID to be set. If not provided, it will be extracted from the token's payload.
        expiration_date (datetime.datetime): Expiration date of the access token. If not provided,
            it will be extracted from the token's payload.

    Returns:
        True if provided token is set and valid. Otherwise, False.
    """
    refresh_token: Optional[str] = kwargs.pop("refresh_token", None)
    id_token: Optional[str] = kwargs.pop("id_token", None)
    client_id: Optional[str] = kwargs.pop("client_id", None)
    expiration_date: Optional[datetime.datetime] = kwargs.pop("expiration_date", None)

    if isinstance(expiration_date, datetime.datetime) and expiration_date < datetime.datetime.now(
        datetime.timezone.utc
    ):
        log.error(f"Token is expired. {expiration_date=}")
    # Set new tokens ...
    self.auth_data[self.KeyNames.ACCESS_TOKEN] = access_token
    self.auth_data[self.KeyNames.EXPIRATION_DATE] = expiration_date
    self.auth_data[self.KeyNames.CLIENT_ID] = client_id
    if refresh_token:
        self.auth_data[self.KeyNames.REFRESH_TOKEN] = refresh_token
    if id_token:
        self.auth_data[self.KeyNames.ID_TOKEN] = id_token
    return True