Skip to content

API documentation

CloudWatcher

A base class for CloudWatch managers

Source code in cloudwatcher/cloudwatcher.py
class CloudWatcher:
    """
    A base class for CloudWatch managers
    """

    def __init__(
        self,
        service_name: str,
        aws_region_name: Optional[str] = None,
        aws_access_key_id: Optional[str] = None,
        aws_secret_access_key: Optional[str] = None,
        aws_session_token: Optional[str] = None,
    ) -> None:
        """
        Initialize CloudWatcher

        Args:
            service_name (str): The name of the service to use
            aws_region_name (Optional[str]): The AWS region name.
            aws_access_key_id (Optional[str]): The AWS access key ID.
            aws_secret_access_key (Optional[str]): The AWS secret access key.
            aws_session_token (Optional[str]): The AWS session token.
        """
        self.aws_region_name = aws_region_name or "us-east-1"
        self.service_name = service_name
        self.client: boto3.Session.client = boto3.client(
            service_name=self.service_name,
            region_name=self.aws_region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
        )

__init__(service_name, aws_region_name=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)

Initialize CloudWatcher

Parameters:

Name Type Description Default
service_name str

The name of the service to use

required
aws_region_name Optional[str]

The AWS region name.

None
aws_access_key_id Optional[str]

The AWS access key ID.

None
aws_secret_access_key Optional[str]

The AWS secret access key.

None
aws_session_token Optional[str]

The AWS session token.

None
Source code in cloudwatcher/cloudwatcher.py
def __init__(
    self,
    service_name: str,
    aws_region_name: Optional[str] = None,
    aws_access_key_id: Optional[str] = None,
    aws_secret_access_key: Optional[str] = None,
    aws_session_token: Optional[str] = None,
) -> None:
    """
    Initialize CloudWatcher

    Args:
        service_name (str): The name of the service to use
        aws_region_name (Optional[str]): The AWS region name.
        aws_access_key_id (Optional[str]): The AWS access key ID.
        aws_secret_access_key (Optional[str]): The AWS secret access key.
        aws_session_token (Optional[str]): The AWS session token.
    """
    self.aws_region_name = aws_region_name or "us-east-1"
    self.service_name = service_name
    self.client: boto3.Session.client = boto3.client(
        service_name=self.service_name,
        region_name=self.aws_region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_session_token=aws_session_token,
    )

LogWatcher

Bases: CloudWatcher

A class for AWS CloudWatch log events retrieval and parsing

Source code in cloudwatcher/logwatcher.py
class LogWatcher(CloudWatcher):
    """
    A class for AWS CloudWatch log events retrieval and parsing
    """

    def __init__(
        self,
        log_group_name: str,
        log_stream_name: str,
        start_token: Optional[str] = None,
        aws_access_key_id: Optional[str] = None,
        aws_secret_access_key: Optional[str] = None,
        aws_session_token: Optional[str] = None,
        aws_region_name: Optional[str] = None,
    ) -> None:
        """
        Initialize LogWatcher

        Args:
            log_group_name (str): The name of the log group
            log_stream_name (str): The name of the log stream
            start_token (Optional[str]): The token to use for the next query
            aws_access_key_id (Optional[str]): The AWS access key ID
            aws_secret_access_key (Optional[str]): The AWS secret access key
            aws_session_token (Optional[str]): The AWS session token
            aws_region_name (Optional[str]): The AWS region name
        """
        super().__init__(
            service_name="logs",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
            aws_region_name=aws_region_name,
        )
        self.log_group_name = log_group_name
        self.log_stream_name = log_stream_name
        self.start_token = start_token

    def __repr__(self) -> str:
        """
        Return a string representation of the object

        Returns:
            str: The string representation of the object
        """
        return f"LogWatcher('{self.log_group_name}/{self.log_stream_name}')"

    def check_log_exists(self) -> bool:
        """
        Check if the log stream exists

        Returns:
            bool: True if the log stream exists, False otherwise
        """
        try:
            response = self.client.describe_log_streams(
                logGroupName=self.log_group_name,
                logStreamNamePrefix=self.log_stream_name,
            )
            return True if response["logStreams"] else False
        except Exception as e:
            _LOGGER.error(f"Error checking if log stream exists: {e}")
            return False

    def _get_events(self, query_kwargs: Dict[str, Any]) -> LogEventsList:
        """
        Get events from CloudWatch and update the arguments
        for the next query with 'nextForwardToken'

        Args:
            query_kwargs (Dict[str, Any]): The query arguments
        Returns:
            List[Event]: The list of log events
        """
        response = self.client.get_log_events(**query_kwargs)
        log_events_list = LogEventsList.from_response(response)
        query_kwargs.update({"nextToken": log_events_list.next_forward_token})
        return log_events_list

    def stream_cloudwatch_logs(
        self, events_limit: int = 1000, max_retry_attempts: int = 5
    ) -> Generator[LogEventsList, None, None]:
        """
        A generator that retrieves desired number of log events per iteration

        Args:
            events_limit (int): The number of events to retrieve per iteration.
            max_retry_attempts (int): The number of retry attempts.
        Returns:
            List[Event]: The list of log events
        """
        query_kwargs = dict(
            logGroupName=self.log_group_name,
            logStreamName=self.log_stream_name,
            limit=events_limit,
            startFromHead=True,
        )
        if self.start_token:
            query_kwargs.update({"nextToken": self.start_token})
        _LOGGER.debug(
            f"Retrieving log events from: {self.log_group_name}/{self.log_stream_name}"
        )
        log_events_list = self._get_events(query_kwargs)
        yield log_events_list
        while log_events_list:
            log_events_list = self._get_events(query_kwargs)
            retry_attempts = 0
            while not log_events_list and max_retry_attempts > retry_attempts:
                log_events_list = self._get_events(query_kwargs)
                retry_attempts += 1
                _LOGGER.debug(
                    f"Received empty log events list. Retry attempt: {retry_attempts}"
                )
            yield log_events_list

    def stream_formatted_logs(
        self,
        events_limit: int = 1000,
        max_retry_attempts: int = 5,
        sep: str = "<br>",
    ) -> Generator[Tuple[str, Optional[str]], None, None]:
        """
        A generator that yields formatted log events

        Args:
            events_limit (int): The number of events to retrieve per iteration.
            max_retry_attempts (int): The number of retry attempts.
            sep (str): The separator to use between log events.
        Returns:
            Tuple[List[str], str]: The list of formatted log events and the next token
        """
        for log_events_list in self.stream_cloudwatch_logs(
            events_limit=events_limit,
            max_retry_attempts=max_retry_attempts,
        ):
            formatted_log_events = log_events_list.format_messages().events
            yield sep.join(
                [event.message for event in formatted_log_events]
            ), log_events_list.next_forward_token

    def return_formatted_logs(
        self, events_limit: int = 1000, max_retry_attempts: int = 5
    ) -> Tuple[str, Optional[str]]:
        """
        A generator that yields formatted log events

        Args:
            events_limit (int): The number of events to retrieve per iteration.
            max_retry_attempts (int): The number of retry attempts.
        Returns:
            Tuple[str, str]: The list of formatted log events and the next token
        """
        formatted_events = ""
        for log_events_list in self.stream_cloudwatch_logs(
            events_limit=events_limit, max_retry_attempts=max_retry_attempts
        ):
            formatted_log_events_list = log_events_list.format_messages()
            formatted_events += "\n".join(
                [event.message for event in formatted_log_events_list.events]
            )
        return formatted_events, formatted_log_events_list.next_forward_token

    def save_log_file(self, file_path: str) -> None:
        """
        Save the log file to the specified path

        Args:
            file_path (str): The path to save the log file to.
        """
        logs, _ = self.return_formatted_logs()
        with open(file_path, "w") as f:
            f.write(logs)
        _LOGGER.info(
            f"Logs '{self.log_group_name}/{self.log_stream_name}' saved to: {file_path}"
        )

__init__(log_group_name, log_stream_name, start_token=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, aws_region_name=None)

Initialize LogWatcher

Parameters:

Name Type Description Default
log_group_name str

The name of the log group

required
log_stream_name str

The name of the log stream

required
start_token Optional[str]

The token to use for the next query

None
aws_access_key_id Optional[str]

The AWS access key ID

None
aws_secret_access_key Optional[str]

The AWS secret access key

None
aws_session_token Optional[str]

The AWS session token

None
aws_region_name Optional[str]

The AWS region name

None
Source code in cloudwatcher/logwatcher.py
def __init__(
    self,
    log_group_name: str,
    log_stream_name: str,
    start_token: Optional[str] = None,
    aws_access_key_id: Optional[str] = None,
    aws_secret_access_key: Optional[str] = None,
    aws_session_token: Optional[str] = None,
    aws_region_name: Optional[str] = None,
) -> None:
    """
    Initialize LogWatcher

    Args:
        log_group_name (str): The name of the log group
        log_stream_name (str): The name of the log stream
        start_token (Optional[str]): The token to use for the next query
        aws_access_key_id (Optional[str]): The AWS access key ID
        aws_secret_access_key (Optional[str]): The AWS secret access key
        aws_session_token (Optional[str]): The AWS session token
        aws_region_name (Optional[str]): The AWS region name
    """
    super().__init__(
        service_name="logs",
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_session_token=aws_session_token,
        aws_region_name=aws_region_name,
    )
    self.log_group_name = log_group_name
    self.log_stream_name = log_stream_name
    self.start_token = start_token

__repr__()

Return a string representation of the object

Returns:

Name Type Description
str str

The string representation of the object

Source code in cloudwatcher/logwatcher.py
def __repr__(self) -> str:
    """
    Return a string representation of the object

    Returns:
        str: The string representation of the object
    """
    return f"LogWatcher('{self.log_group_name}/{self.log_stream_name}')"

check_log_exists()

Check if the log stream exists

Returns:

Name Type Description
bool bool

True if the log stream exists, False otherwise

Source code in cloudwatcher/logwatcher.py
def check_log_exists(self) -> bool:
    """
    Check if the log stream exists

    Returns:
        bool: True if the log stream exists, False otherwise
    """
    try:
        response = self.client.describe_log_streams(
            logGroupName=self.log_group_name,
            logStreamNamePrefix=self.log_stream_name,
        )
        return True if response["logStreams"] else False
    except Exception as e:
        _LOGGER.error(f"Error checking if log stream exists: {e}")
        return False

return_formatted_logs(events_limit=1000, max_retry_attempts=5)

A generator that yields formatted log events

Parameters:

Name Type Description Default
events_limit int

The number of events to retrieve per iteration.

1000
max_retry_attempts int

The number of retry attempts.

5

Returns:

Type Description
Tuple[str, Optional[str]]

Tuple[str, str]: The list of formatted log events and the next token

Source code in cloudwatcher/logwatcher.py
def return_formatted_logs(
    self, events_limit: int = 1000, max_retry_attempts: int = 5
) -> Tuple[str, Optional[str]]:
    """
    A generator that yields formatted log events

    Args:
        events_limit (int): The number of events to retrieve per iteration.
        max_retry_attempts (int): The number of retry attempts.
    Returns:
        Tuple[str, str]: The list of formatted log events and the next token
    """
    formatted_events = ""
    for log_events_list in self.stream_cloudwatch_logs(
        events_limit=events_limit, max_retry_attempts=max_retry_attempts
    ):
        formatted_log_events_list = log_events_list.format_messages()
        formatted_events += "\n".join(
            [event.message for event in formatted_log_events_list.events]
        )
    return formatted_events, formatted_log_events_list.next_forward_token

save_log_file(file_path)

Save the log file to the specified path

Parameters:

Name Type Description Default
file_path str

The path to save the log file to.

required
Source code in cloudwatcher/logwatcher.py
def save_log_file(self, file_path: str) -> None:
    """
    Save the log file to the specified path

    Args:
        file_path (str): The path to save the log file to.
    """
    logs, _ = self.return_formatted_logs()
    with open(file_path, "w") as f:
        f.write(logs)
    _LOGGER.info(
        f"Logs '{self.log_group_name}/{self.log_stream_name}' saved to: {file_path}"
    )

stream_cloudwatch_logs(events_limit=1000, max_retry_attempts=5)

A generator that retrieves desired number of log events per iteration

Parameters:

Name Type Description Default
events_limit int

The number of events to retrieve per iteration.

1000
max_retry_attempts int

The number of retry attempts.

5

Returns:

Type Description
Generator[LogEventsList, None, None]

List[Event]: The list of log events

Source code in cloudwatcher/logwatcher.py
def stream_cloudwatch_logs(
    self, events_limit: int = 1000, max_retry_attempts: int = 5
) -> Generator[LogEventsList, None, None]:
    """
    A generator that retrieves desired number of log events per iteration

    Args:
        events_limit (int): The number of events to retrieve per iteration.
        max_retry_attempts (int): The number of retry attempts.
    Returns:
        List[Event]: The list of log events
    """
    query_kwargs = dict(
        logGroupName=self.log_group_name,
        logStreamName=self.log_stream_name,
        limit=events_limit,
        startFromHead=True,
    )
    if self.start_token:
        query_kwargs.update({"nextToken": self.start_token})
    _LOGGER.debug(
        f"Retrieving log events from: {self.log_group_name}/{self.log_stream_name}"
    )
    log_events_list = self._get_events(query_kwargs)
    yield log_events_list
    while log_events_list:
        log_events_list = self._get_events(query_kwargs)
        retry_attempts = 0
        while not log_events_list and max_retry_attempts > retry_attempts:
            log_events_list = self._get_events(query_kwargs)
            retry_attempts += 1
            _LOGGER.debug(
                f"Received empty log events list. Retry attempt: {retry_attempts}"
            )
        yield log_events_list

stream_formatted_logs(events_limit=1000, max_retry_attempts=5, sep='<br>')

A generator that yields formatted log events

Parameters:

Name Type Description Default
events_limit int

The number of events to retrieve per iteration.

1000
max_retry_attempts int

The number of retry attempts.

5
sep str

The separator to use between log events.

'<br>'

Returns:

Type Description
Generator[Tuple[str, Optional[str]], None, None]

Tuple[List[str], str]: The list of formatted log events and the next token

Source code in cloudwatcher/logwatcher.py
def stream_formatted_logs(
    self,
    events_limit: int = 1000,
    max_retry_attempts: int = 5,
    sep: str = "<br>",
) -> Generator[Tuple[str, Optional[str]], None, None]:
    """
    A generator that yields formatted log events

    Args:
        events_limit (int): The number of events to retrieve per iteration.
        max_retry_attempts (int): The number of retry attempts.
        sep (str): The separator to use between log events.
    Returns:
        Tuple[List[str], str]: The list of formatted log events and the next token
    """
    for log_events_list in self.stream_cloudwatch_logs(
        events_limit=events_limit,
        max_retry_attempts=max_retry_attempts,
    ):
        formatted_log_events = log_events_list.format_messages().events
        yield sep.join(
            [event.message for event in formatted_log_events]
        ), log_events_list.next_forward_token

MetricWatcher

Bases: CloudWatcher

A class for AWS CloudWatch metric retrieval and parsing

Source code in cloudwatcher/metricwatcher.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
class MetricWatcher(CloudWatcher):
    """
    A class for AWS CloudWatch metric retrieval and parsing
    """

    def __init__(
        self,
        namespace: str,
        dimensions_list: List[Dimension],
        metric_name: str,
        metric_id: str,
        metric_unit: Optional[str] = None,
        metric_description: Optional[str] = None,
        aws_access_key_id: Optional[str] = None,
        aws_secret_access_key: Optional[str] = None,
        aws_session_token: Optional[str] = None,
        aws_region_name: Optional[str] = None,
    ) -> None:
        """
        Initialize MetricWatcher

        Args:
            namespace (str): the namespace of the metric
            dimensions_list (List[Dimension]): the dimensions of the metric
            metric_name (str): the name of the metric
            metric_id (str): the ID of the metric
            metric_unit (Optional[str]): the unit of the metric
            aws_access_key_id (Optional[str]): the AWS access key ID
            aws_secret_access_key (Optional[str]): the AWS secret access key
            aws_session_token (Optional[str]): the AWS session token
            aws_region_name (Optional[str]): the AWS region name
        """
        super().__init__(
            service_name="cloudwatch",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
            aws_region_name=aws_region_name,
        )
        self.namespace = namespace
        self.dimensions_list = dimensions_list
        self.metric_name = metric_name
        self.metric_id = metric_id
        self.metric_unit = metric_unit
        self.ec2_resource = boto3.resource(
            service_name="ec2",
            region_name=self.aws_region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
        )
        self.metric_description = metric_description

    def query_ec2_metrics(
        self,
        days: int,
        hours: int,
        minutes: int,
        stat: str,
        period: int,
    ) -> Optional[Dict]:
        """
        Query EC2 metrics

        Args:
            days (int): how many days to subtract from the current date to determine
                the metric collection start time
            hours (int): how many hours to subtract from the current time to determine
                the metric collection start time
            minutes (int): how many minutes to subtract from the current time to
                determine the metric collection start time
            stat (str): the statistic to query
            period (int): the period of the metric

        Returns:
            Dict: the response from the query, check the structure of the
            response [here](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch.html#CloudWatch.Client.get_metric_data) # noqa: E501
        """
        if self.namespace is None:
            raise ValueError(f"Invalid metric namespace to watch: {self.namespace}")
        # Create CloudWatch client
        now = datetime.datetime.now(pytz.utc)
        start_time = now - datetime.timedelta(days=days, hours=hours, minutes=minutes)

        def _time(x: datetime.datetime):
            """
            Format a datetime object for logging

            Args:
                x (datetime.datetime): the datetime object to format
            """
            return x.strftime("%Y-%m-%d %H:%M:%S")

        _LOGGER.info(
            f"Querying '{self.metric_name}' for dimensions {self.dimensions_list} "
            f"from {_time(start_time)} to {_time(now)}"
        )

        response = self.client.get_metric_data(
            MetricDataQueries=[
                {
                    "Id": self.metric_id,
                    "MetricStat": {
                        "Metric": {
                            "Namespace": self.namespace,
                            "MetricName": self.metric_name,
                            "Dimensions": [dim.dict() for dim in self.dimensions_list],
                        },
                        "Stat": stat,
                        "Unit": str(
                            self.metric_unit
                        ),  # str(None) is desired, if no unit is specified
                        "Period": period,
                    },
                },
            ],
            StartTime=start_time,
            EndTime=now,
        )
        resp_status = response["ResponseMetadata"]["HTTPStatusCode"]
        if resp_status != 200:
            _LOGGER.error(f"Invalid response status code: {resp_status}")
            return None
        _LOGGER.debug(f"Response status code: {resp_status}")
        return response

    def get_ec2_uptime(
        self,
        ec2_instance_id: str,
        days: int,
        hours: int,
        minutes: int,
        period: int = 60,
    ) -> Optional[float]:
        """
        Get the runtime of an EC2 instance

        Args:
            ec2_instance_id (str): the ID of the EC2 instance
            days (int): how many days to subtract from the current date to determine
                the metric collection start time
            hours (int): how many hours to subtract from the current time to determine
                 the metric collection start time
            minutes (int): how many minutes to subtract from the current time to
                determine the metric collection start time

        Returns:
            float: the runtime of the EC2 instance in minutes
        """
        if not self.is_ec2_running(ec2_instance_id):
            _LOGGER.info(
                f"Instance '{ec2_instance_id}' is not running anymore. "
                f"Uptime will be estimated based on reported metrics in "
                f"the last {days} days"
            )
            instances = self.ec2_resource.instances.filter(
                Filters=[{"Name": "instance-id", "Values": [ec2_instance_id]}]
            )
            # get the latest reported metric
            metrics_response = self.query_ec2_metrics(
                days=days,
                hours=hours,
                minutes=minutes,
                stat="Maximum",  # any stat works
                period=period,  # most precise period that AWS stores for instances
                # where start time is between 3 hours and 15 days ago is 60 seconds
            )
            if metrics_response is None:
                return None
            # extract the latest metric report time
            timed_metrics = self.timed_metric_factory(metrics_response)
            try:
                earliest_metric_report_time = timed_metrics[-1].timestamps[0]
                latest_metric_report_time = timed_metrics[-1].timestamps[-1]
                return (
                    earliest_metric_report_time - latest_metric_report_time
                ).total_seconds()
            except IndexError:
                _LOGGER.warning(f"No metric data found for EC2: {ec2_instance_id}")
                return None
        instances = self.ec2_resource.instances.filter(
            Filters=[{"Name": "instance-id", "Values": [ec2_instance_id]}]
        )
        if len(list(instances)) != 1:
            raise Exception(f"Multiple EC2 instances matched by ID: {ec2_instance_id}")
        instance = list(instances)[0]
        _LOGGER.info(
            f"Instance '{ec2_instance_id}' is still running. "
            f"Launch time: {instance.launch_time}"
        )
        return (datetime.datetime.now(pytz.utc) - instance.launch_time).total_seconds()

    def is_ec2_running(self, ec2_instance_id: str) -> bool:
        """
        Check if EC2 instance is running

        Args:
            ec2_instance_id (str): the ID of the EC2 instance

        Returns:
            bool: True if EC2 instance is running, False otherwise.
        """
        instances = self.ec2_resource.instances.filter(
            Filters=[{"Name": "instance-id", "Values": [ec2_instance_id]}]
        )
        if len(list(instances)) == 0:
            return False
        if len(list(instances)) > 1:
            raise Exception(f"Multiple EC2 instances matched by ID: {ec2_instance_id}")
        for instance in instances:
            # check the status codes and their meanings:
            # https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceState.html # noqa: E501
            if instance.state["Code"] <= 16:
                return True
        return False

    @staticmethod
    def timed_metric_factory(response: dict) -> List[TimedMetric]:
        """
        Create a collection of TimedMetrics from the CloudWatch client response.

        Args:
            response (dict): the response from the query

        Returns:
            List[TimedMetric]: a collection of TimedMetrics
        """
        return [
            TimedMetric(
                label=metric_data_result["Label"],
                timestamps=metric_data_result["Timestamps"],
                values=metric_data_result["Values"],
            )
            for metric_data_result in response["MetricDataResults"]
        ]

    def _exec_timed_metric_handler(
        self,
        handler_class: Type,
        response: Optional[Dict] = None,
        query_kwargs: Optional[Dict] = None,
        **kwargs,
    ) -> None:
        """
        Internal method to execute a TimedMetricHandler

        Args:
            handler_class (TimedMetricHandler): the TimedMetricHandler to execute
            response (Optional[Dict]): the response from the query
            query_kwargs (Optional[Dict]): the query kwargs to use for the query
            **kwargs: additional kwargs to pass to the handler
        """
        _LOGGER.debug(f"Executing '{handler_class.__name__}'")
        if response is None:
            if query_kwargs is not None:
                response = self.query_ec2_metrics(**query_kwargs)
            else:
                raise ValueError("Either response or query_kwargs must be provided")
        if response is None:
            return None
        timed_metrics = self.timed_metric_factory(response)
        for timed_metric in timed_metrics:
            if len(timed_metric.values) < 1:
                continue
            handler = handler_class(timed_metric=timed_metric)
            handler(**kwargs)

    def _exec_response_handler(
        self,
        handler_class: Type,
        response: Optional[Dict] = None,
        query_kwargs: Optional[Dict] = None,
        **kwargs,
    ) -> None:
        """
        Internal method to execute a ResponseHandler

        Args:
            handler_class (ResponseHandler): the ResponseHandler to execute
            response (Optional[Dict]): the response from the query
            query_kwargs (Optional[Dict]): the query kwargs to use for the query
            **kwargs: additional kwargs to pass to the handler

        """
        _LOGGER.debug(f"Executing '{handler_class.__class__.__name__}'")
        if response is None:
            if query_kwargs is not None:
                response = self.query_ec2_metrics(**query_kwargs)
            else:
                raise ValueError("Either response or query_kwargs must be provided")
        handler = handler_class(response=response)
        if kwargs is None:
            handler()
        else:
            handler(**kwargs)

    def save_metric_json(
        self,
        file_path: str,
        response: Optional[Dict] = None,
        query_kwargs: Optional[Dict] = None,
    ):
        """
        Query and save the metric data to a JSON file

        Args:
            file_path (str): the file path to save the metric data to
            response (Optional[Dict]): the response from the query
            query_kwargs (Optional[str]): the query preset to use for the query
        """
        self._exec_timed_metric_handler(
            TimedMetricJsonSaver,
            target=file_path,
            response=response,
            query_kwargs=query_kwargs,
        )

    def save_metric_csv(
        self,
        file_path: str,
        response: Optional[Dict] = None,
        query_kwargs: Optional[Dict] = None,
    ):
        """
        Query and save the metric data to a CSV file

        Args:
            file_path (str): the file path to save the metric data to
            response (Optional[Dict]): the response from the query
            query_kwargs (Optional[str]): the query preset to use for the query
        """
        self._exec_timed_metric_handler(
            TimedMetricCsvSaver,
            target=file_path,
            response=response,
            query_kwargs=query_kwargs,
        )

    def log_metric(self, response: Optional[Dict] = None):
        """
        Query and log the metric data

        Args:
            response (Optional[Dict]): the response from the query
        """
        self._exec_timed_metric_handler(
            TimedMetricLogger,
            target=None,  # TODO: add support for saving to file
            response=response,
        )

    def save_metric_plot(
        self,
        file_path: str,
        response: Optional[Dict] = None,
        query_kwargs: Optional[Dict] = None,
    ):
        """
        Query and plot the metric data

        Args:
            file_path (str): the file path to save the metric data to
            response (Optional[Dict]): the response from the query
            query_kwargs (Optional[str]): the query preset to use for the query
        """
        self._exec_timed_metric_handler(
            TimedMetricPlotter,
            target=file_path,
            metric_unit=self.metric_unit,
            response=response,
            query_kwargs=query_kwargs,
        )

    def log_metric_summary(self, response: Optional[Dict] = None):
        """
        Query and summarize the metric data to a JSON file

        Args:
            response (Optional[Dict]): the response from the query
        """
        self._exec_timed_metric_handler(
            TimedMetricSummarizer,
            target=None,  # TODO: add support for saving to file
            metric_unit=self.metric_unit,
            summarizer=("Max", max),
            response=response,
        )

    def save_response_json(
        self,
        file_path: str,
        response: Optional[Dict] = None,
        query_kwargs: Optional[Dict] = None,
    ):
        """
        Query and save the response data to a JSON file

        Args:
            file_path (str): the file path to save the response data to
            response (Optional[Dict]): the response from the query
            query_kwargs (Optional[str]): the query preset to use for the query
        """
        self._exec_response_handler(
            ResponseSaver,
            target=file_path,
            response=response,
            query_kwargs=query_kwargs,
        )

    def log_response(self, response: Optional[Dict] = None):
        """
        Query and log the response

        Args:
            response (Optional[Dict]): the response from the query
        """
        self._exec_response_handler(
            ResponseLogger,
            target=None,
            response=response,
        )

__init__(namespace, dimensions_list, metric_name, metric_id, metric_unit=None, metric_description=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, aws_region_name=None)

Initialize MetricWatcher

Parameters:

Name Type Description Default
namespace str

the namespace of the metric

required
dimensions_list List[Dimension]

the dimensions of the metric

required
metric_name str

the name of the metric

required
metric_id str

the ID of the metric

required
metric_unit Optional[str]

the unit of the metric

None
aws_access_key_id Optional[str]

the AWS access key ID

None
aws_secret_access_key Optional[str]

the AWS secret access key

None
aws_session_token Optional[str]

the AWS session token

None
aws_region_name Optional[str]

the AWS region name

None
Source code in cloudwatcher/metricwatcher.py
def __init__(
    self,
    namespace: str,
    dimensions_list: List[Dimension],
    metric_name: str,
    metric_id: str,
    metric_unit: Optional[str] = None,
    metric_description: Optional[str] = None,
    aws_access_key_id: Optional[str] = None,
    aws_secret_access_key: Optional[str] = None,
    aws_session_token: Optional[str] = None,
    aws_region_name: Optional[str] = None,
) -> None:
    """
    Initialize MetricWatcher

    Args:
        namespace (str): the namespace of the metric
        dimensions_list (List[Dimension]): the dimensions of the metric
        metric_name (str): the name of the metric
        metric_id (str): the ID of the metric
        metric_unit (Optional[str]): the unit of the metric
        aws_access_key_id (Optional[str]): the AWS access key ID
        aws_secret_access_key (Optional[str]): the AWS secret access key
        aws_session_token (Optional[str]): the AWS session token
        aws_region_name (Optional[str]): the AWS region name
    """
    super().__init__(
        service_name="cloudwatch",
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_session_token=aws_session_token,
        aws_region_name=aws_region_name,
    )
    self.namespace = namespace
    self.dimensions_list = dimensions_list
    self.metric_name = metric_name
    self.metric_id = metric_id
    self.metric_unit = metric_unit
    self.ec2_resource = boto3.resource(
        service_name="ec2",
        region_name=self.aws_region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_session_token=aws_session_token,
    )
    self.metric_description = metric_description

get_ec2_uptime(ec2_instance_id, days, hours, minutes, period=60)

Get the runtime of an EC2 instance

Parameters:

Name Type Description Default
ec2_instance_id str

the ID of the EC2 instance

required
days int

how many days to subtract from the current date to determine the metric collection start time

required
hours int

how many hours to subtract from the current time to determine the metric collection start time

required
minutes int

how many minutes to subtract from the current time to determine the metric collection start time

required

Returns:

Name Type Description
float Optional[float]

the runtime of the EC2 instance in minutes

Source code in cloudwatcher/metricwatcher.py
def get_ec2_uptime(
    self,
    ec2_instance_id: str,
    days: int,
    hours: int,
    minutes: int,
    period: int = 60,
) -> Optional[float]:
    """
    Get the runtime of an EC2 instance

    Args:
        ec2_instance_id (str): the ID of the EC2 instance
        days (int): how many days to subtract from the current date to determine
            the metric collection start time
        hours (int): how many hours to subtract from the current time to determine
             the metric collection start time
        minutes (int): how many minutes to subtract from the current time to
            determine the metric collection start time

    Returns:
        float: the runtime of the EC2 instance in minutes
    """
    if not self.is_ec2_running(ec2_instance_id):
        _LOGGER.info(
            f"Instance '{ec2_instance_id}' is not running anymore. "
            f"Uptime will be estimated based on reported metrics in "
            f"the last {days} days"
        )
        instances = self.ec2_resource.instances.filter(
            Filters=[{"Name": "instance-id", "Values": [ec2_instance_id]}]
        )
        # get the latest reported metric
        metrics_response = self.query_ec2_metrics(
            days=days,
            hours=hours,
            minutes=minutes,
            stat="Maximum",  # any stat works
            period=period,  # most precise period that AWS stores for instances
            # where start time is between 3 hours and 15 days ago is 60 seconds
        )
        if metrics_response is None:
            return None
        # extract the latest metric report time
        timed_metrics = self.timed_metric_factory(metrics_response)
        try:
            earliest_metric_report_time = timed_metrics[-1].timestamps[0]
            latest_metric_report_time = timed_metrics[-1].timestamps[-1]
            return (
                earliest_metric_report_time - latest_metric_report_time
            ).total_seconds()
        except IndexError:
            _LOGGER.warning(f"No metric data found for EC2: {ec2_instance_id}")
            return None
    instances = self.ec2_resource.instances.filter(
        Filters=[{"Name": "instance-id", "Values": [ec2_instance_id]}]
    )
    if len(list(instances)) != 1:
        raise Exception(f"Multiple EC2 instances matched by ID: {ec2_instance_id}")
    instance = list(instances)[0]
    _LOGGER.info(
        f"Instance '{ec2_instance_id}' is still running. "
        f"Launch time: {instance.launch_time}"
    )
    return (datetime.datetime.now(pytz.utc) - instance.launch_time).total_seconds()

is_ec2_running(ec2_instance_id)

Check if EC2 instance is running

Parameters:

Name Type Description Default
ec2_instance_id str

the ID of the EC2 instance

required

Returns:

Name Type Description
bool bool

True if EC2 instance is running, False otherwise.

Source code in cloudwatcher/metricwatcher.py
def is_ec2_running(self, ec2_instance_id: str) -> bool:
    """
    Check if EC2 instance is running

    Args:
        ec2_instance_id (str): the ID of the EC2 instance

    Returns:
        bool: True if EC2 instance is running, False otherwise.
    """
    instances = self.ec2_resource.instances.filter(
        Filters=[{"Name": "instance-id", "Values": [ec2_instance_id]}]
    )
    if len(list(instances)) == 0:
        return False
    if len(list(instances)) > 1:
        raise Exception(f"Multiple EC2 instances matched by ID: {ec2_instance_id}")
    for instance in instances:
        # check the status codes and their meanings:
        # https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceState.html # noqa: E501
        if instance.state["Code"] <= 16:
            return True
    return False

log_metric(response=None)

Query and log the metric data

Parameters:

Name Type Description Default
response Optional[Dict]

the response from the query

None
Source code in cloudwatcher/metricwatcher.py
def log_metric(self, response: Optional[Dict] = None):
    """
    Query and log the metric data

    Args:
        response (Optional[Dict]): the response from the query
    """
    self._exec_timed_metric_handler(
        TimedMetricLogger,
        target=None,  # TODO: add support for saving to file
        response=response,
    )

log_metric_summary(response=None)

Query and summarize the metric data to a JSON file

Parameters:

Name Type Description Default
response Optional[Dict]

the response from the query

None
Source code in cloudwatcher/metricwatcher.py
def log_metric_summary(self, response: Optional[Dict] = None):
    """
    Query and summarize the metric data to a JSON file

    Args:
        response (Optional[Dict]): the response from the query
    """
    self._exec_timed_metric_handler(
        TimedMetricSummarizer,
        target=None,  # TODO: add support for saving to file
        metric_unit=self.metric_unit,
        summarizer=("Max", max),
        response=response,
    )

log_response(response=None)

Query and log the response

Parameters:

Name Type Description Default
response Optional[Dict]

the response from the query

None
Source code in cloudwatcher/metricwatcher.py
def log_response(self, response: Optional[Dict] = None):
    """
    Query and log the response

    Args:
        response (Optional[Dict]): the response from the query
    """
    self._exec_response_handler(
        ResponseLogger,
        target=None,
        response=response,
    )

query_ec2_metrics(days, hours, minutes, stat, period)

Query EC2 metrics

Parameters:

Name Type Description Default
days int

how many days to subtract from the current date to determine the metric collection start time

required
hours int

how many hours to subtract from the current time to determine the metric collection start time

required
minutes int

how many minutes to subtract from the current time to determine the metric collection start time

required
stat str

the statistic to query

required
period int

the period of the metric

required

Returns:

Name Type Description
Dict Optional[Dict]

the response from the query, check the structure of the

Optional[Dict]

response here # noqa: E501

Source code in cloudwatcher/metricwatcher.py
def query_ec2_metrics(
    self,
    days: int,
    hours: int,
    minutes: int,
    stat: str,
    period: int,
) -> Optional[Dict]:
    """
    Query EC2 metrics

    Args:
        days (int): how many days to subtract from the current date to determine
            the metric collection start time
        hours (int): how many hours to subtract from the current time to determine
            the metric collection start time
        minutes (int): how many minutes to subtract from the current time to
            determine the metric collection start time
        stat (str): the statistic to query
        period (int): the period of the metric

    Returns:
        Dict: the response from the query, check the structure of the
        response [here](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch.html#CloudWatch.Client.get_metric_data) # noqa: E501
    """
    if self.namespace is None:
        raise ValueError(f"Invalid metric namespace to watch: {self.namespace}")
    # Create CloudWatch client
    now = datetime.datetime.now(pytz.utc)
    start_time = now - datetime.timedelta(days=days, hours=hours, minutes=minutes)

    def _time(x: datetime.datetime):
        """
        Format a datetime object for logging

        Args:
            x (datetime.datetime): the datetime object to format
        """
        return x.strftime("%Y-%m-%d %H:%M:%S")

    _LOGGER.info(
        f"Querying '{self.metric_name}' for dimensions {self.dimensions_list} "
        f"from {_time(start_time)} to {_time(now)}"
    )

    response = self.client.get_metric_data(
        MetricDataQueries=[
            {
                "Id": self.metric_id,
                "MetricStat": {
                    "Metric": {
                        "Namespace": self.namespace,
                        "MetricName": self.metric_name,
                        "Dimensions": [dim.dict() for dim in self.dimensions_list],
                    },
                    "Stat": stat,
                    "Unit": str(
                        self.metric_unit
                    ),  # str(None) is desired, if no unit is specified
                    "Period": period,
                },
            },
        ],
        StartTime=start_time,
        EndTime=now,
    )
    resp_status = response["ResponseMetadata"]["HTTPStatusCode"]
    if resp_status != 200:
        _LOGGER.error(f"Invalid response status code: {resp_status}")
        return None
    _LOGGER.debug(f"Response status code: {resp_status}")
    return response

save_metric_csv(file_path, response=None, query_kwargs=None)

Query and save the metric data to a CSV file

Parameters:

Name Type Description Default
file_path str

the file path to save the metric data to

required
response Optional[Dict]

the response from the query

None
query_kwargs Optional[str]

the query preset to use for the query

None
Source code in cloudwatcher/metricwatcher.py
def save_metric_csv(
    self,
    file_path: str,
    response: Optional[Dict] = None,
    query_kwargs: Optional[Dict] = None,
):
    """
    Query and save the metric data to a CSV file

    Args:
        file_path (str): the file path to save the metric data to
        response (Optional[Dict]): the response from the query
        query_kwargs (Optional[str]): the query preset to use for the query
    """
    self._exec_timed_metric_handler(
        TimedMetricCsvSaver,
        target=file_path,
        response=response,
        query_kwargs=query_kwargs,
    )

save_metric_json(file_path, response=None, query_kwargs=None)

Query and save the metric data to a JSON file

Parameters:

Name Type Description Default
file_path str

the file path to save the metric data to

required
response Optional[Dict]

the response from the query

None
query_kwargs Optional[str]

the query preset to use for the query

None
Source code in cloudwatcher/metricwatcher.py
def save_metric_json(
    self,
    file_path: str,
    response: Optional[Dict] = None,
    query_kwargs: Optional[Dict] = None,
):
    """
    Query and save the metric data to a JSON file

    Args:
        file_path (str): the file path to save the metric data to
        response (Optional[Dict]): the response from the query
        query_kwargs (Optional[str]): the query preset to use for the query
    """
    self._exec_timed_metric_handler(
        TimedMetricJsonSaver,
        target=file_path,
        response=response,
        query_kwargs=query_kwargs,
    )

save_metric_plot(file_path, response=None, query_kwargs=None)

Query and plot the metric data

Parameters:

Name Type Description Default
file_path str

the file path to save the metric data to

required
response Optional[Dict]

the response from the query

None
query_kwargs Optional[str]

the query preset to use for the query

None
Source code in cloudwatcher/metricwatcher.py
def save_metric_plot(
    self,
    file_path: str,
    response: Optional[Dict] = None,
    query_kwargs: Optional[Dict] = None,
):
    """
    Query and plot the metric data

    Args:
        file_path (str): the file path to save the metric data to
        response (Optional[Dict]): the response from the query
        query_kwargs (Optional[str]): the query preset to use for the query
    """
    self._exec_timed_metric_handler(
        TimedMetricPlotter,
        target=file_path,
        metric_unit=self.metric_unit,
        response=response,
        query_kwargs=query_kwargs,
    )

save_response_json(file_path, response=None, query_kwargs=None)

Query and save the response data to a JSON file

Parameters:

Name Type Description Default
file_path str

the file path to save the response data to

required
response Optional[Dict]

the response from the query

None
query_kwargs Optional[str]

the query preset to use for the query

None
Source code in cloudwatcher/metricwatcher.py
def save_response_json(
    self,
    file_path: str,
    response: Optional[Dict] = None,
    query_kwargs: Optional[Dict] = None,
):
    """
    Query and save the response data to a JSON file

    Args:
        file_path (str): the file path to save the response data to
        response (Optional[Dict]): the response from the query
        query_kwargs (Optional[str]): the query preset to use for the query
    """
    self._exec_response_handler(
        ResponseSaver,
        target=file_path,
        response=response,
        query_kwargs=query_kwargs,
    )

timed_metric_factory(response) staticmethod

Create a collection of TimedMetrics from the CloudWatch client response.

Parameters:

Name Type Description Default
response dict

the response from the query

required

Returns:

Type Description
List[TimedMetric]

List[TimedMetric]: a collection of TimedMetrics

Source code in cloudwatcher/metricwatcher.py
@staticmethod
def timed_metric_factory(response: dict) -> List[TimedMetric]:
    """
    Create a collection of TimedMetrics from the CloudWatch client response.

    Args:
        response (dict): the response from the query

    Returns:
        List[TimedMetric]: a collection of TimedMetrics
    """
    return [
        TimedMetric(
            label=metric_data_result["Label"],
            timestamps=metric_data_result["Timestamps"],
            values=metric_data_result["Values"],
        )
        for metric_data_result in response["MetricDataResults"]
    ]

MetricWatcherSetup

A class for the setup of the MetricWatcher

Source code in cloudwatcher/preset.py
@dataclass
class MetricWatcherSetup:
    """
    A class for the setup of the MetricWatcher
    """

    namespace: str
    dimensions_list: List[Dimension]
    metric_name: str
    metric_id: str
    metric_unit: str
    aws_access_key_id: Optional[str] = None
    aws_secret_access_key: Optional[str] = None
    aws_session_token: Optional[str] = None
    aws_region_name: Optional[str] = None
    metric_description: Optional[str] = None

    def __post_init__(self):
        self.aws_access_key_id = self.aws_access_key_id or os.environ.get(
            "AWS_ACCESS_KEY_ID"
        )
        self.aws_secret_access_key = self.aws_secret_access_key or os.environ.get(
            "AWS_SECRET_ACCESS_KEY"
        )
        self.aws_session_token = self.aws_session_token or os.environ.get(
            "AWS_SESSION_TOKEN"
        )
        self.aws_region_name = self.aws_region_name or os.environ.get(
            "AWS_DEFAULT_REGION"
        )
        self.dimensions_list = [
            Dimension(**dimension) for dimension in self.dimensions_list
        ]

    @classmethod
    def from_dict(cls, data: dict) -> "MetricWatcherSetup":
        """
        Create a MetricWatcherSetup object from a dictionary

        Args:
            data (dict): The dictionary to use
        """
        return cls(**data)

    @classmethod
    def from_json(cls, file_path: Path) -> "MetricWatcherSetup":
        """
        Create a MetricWatcherSetup object from a JSON file

        Args:
            file_path (str): The path to the JSON file
        """
        with open(file_path) as f:
            data = json.load(f)
        return cls.from_dict(data)

    def to_dict(self) -> dict:
        """
        Convert the MetricWatcherSetup object to a dictionary

        Returns:
            dict: The dictionary representation of the object
        """
        return self.__dict__

    def upsert_dimensions(self, dimensions_specs: Optional[List[str]] = None):
        """
        Upsert the dimensions list with the dimensions specified in the environment

        Args:
            dimensions_specs (List[str]): A list of strings. Format: "Name:Value"
        """
        if dimensions_specs is None:
            return
        for dimension_spec in dimensions_specs:
            name, value = dimension_spec.split(":")
            for dimension in self.dimensions_list:
                if dimension.Name == name:
                    dimension.Value = value
                    break
            else:
                self.dimensions_list.append(Dimension(Name=name, Value=value))

from_dict(data) classmethod

Create a MetricWatcherSetup object from a dictionary

Parameters:

Name Type Description Default
data dict

The dictionary to use

required
Source code in cloudwatcher/preset.py
@classmethod
def from_dict(cls, data: dict) -> "MetricWatcherSetup":
    """
    Create a MetricWatcherSetup object from a dictionary

    Args:
        data (dict): The dictionary to use
    """
    return cls(**data)

from_json(file_path) classmethod

Create a MetricWatcherSetup object from a JSON file

Parameters:

Name Type Description Default
file_path str

The path to the JSON file

required
Source code in cloudwatcher/preset.py
@classmethod
def from_json(cls, file_path: Path) -> "MetricWatcherSetup":
    """
    Create a MetricWatcherSetup object from a JSON file

    Args:
        file_path (str): The path to the JSON file
    """
    with open(file_path) as f:
        data = json.load(f)
    return cls.from_dict(data)

to_dict()

Convert the MetricWatcherSetup object to a dictionary

Returns:

Name Type Description
dict dict

The dictionary representation of the object

Source code in cloudwatcher/preset.py
def to_dict(self) -> dict:
    """
    Convert the MetricWatcherSetup object to a dictionary

    Returns:
        dict: The dictionary representation of the object
    """
    return self.__dict__

upsert_dimensions(dimensions_specs=None)

Upsert the dimensions list with the dimensions specified in the environment

Parameters:

Name Type Description Default
dimensions_specs List[str]

A list of strings. Format: "Name:Value"

None
Source code in cloudwatcher/preset.py
def upsert_dimensions(self, dimensions_specs: Optional[List[str]] = None):
    """
    Upsert the dimensions list with the dimensions specified in the environment

    Args:
        dimensions_specs (List[str]): A list of strings. Format: "Name:Value"
    """
    if dimensions_specs is None:
        return
    for dimension_spec in dimensions_specs:
        name, value = dimension_spec.split(":")
        for dimension in self.dimensions_list:
            if dimension.Name == name:
                dimension.Value = value
                break
        else:
            self.dimensions_list.append(Dimension(Name=name, Value=value))

PresetFilesInventory

Source code in cloudwatcher/preset.py
class PresetFilesInventory:
    def __init__(self, presets_dir: Optional[Union[Path, str]] = None) -> None:
        """
        Initialize the preset inventory

        Args:
            presets_dir (Path): The path to the presets directory

        Raises:
            ValueError: If the presets directory does not exist
        """
        preset_dir = (
            Path(presets_dir)
            if presets_dir is not None
            else Path(__file__).parent / "presets"
        )
        if not preset_dir.exists():
            raise ValueError(f"Presets directory {preset_dir} does not exist")
        self._presets_dir = preset_dir
        _LOGGER.debug(f"Presets directory: {self.presets_dir}")
        self._presets = self._get_available_presets(self.presets_dir)

    def _get_available_presets(self, presets_dir: Path) -> Dict[str, Path]:
        return {
            preset_file.stem: preset_file
            for preset_file in presets_dir.iterdir()
            if preset_file.is_file() and preset_file.suffix == ".json"
        }

    @property
    def presets_table(self) -> Table:
        """
        Get a rich table with the available presets

        Returns:
            Table: The rich table
        """
        table = Table(show_header=True, header_style="bold magenta")
        table.add_column("Name")
        table.add_column("Path", style="dim")
        for preset_name, preset_path in self.presets.items():
            table.add_row(preset_name, preset_path.as_posix())
        table.title = f"Presets available in: {self.presets_dir}"
        return table

    @property
    def presets(self) -> Dict[str, Path]:
        """
        Get the available presets

        Returns:
            Dict[str, Path]: The available presets
        """
        return self._presets

    @property
    def presets_list(self) -> List[str]:
        """
        Get the list of available presets

        Returns:
            List[str]: The list of available presets
        """
        return list(self._presets.keys())

    @property
    def presets_dir(self) -> Path:
        """
        Get the presets directory

        Returns:
            Path: The presets directory
        """
        return self._presets_dir

    def get_preset_path(self, preset_name: str) -> Path:
        """
        Get the preset file content

        Args:
            preset_name (str): The name of the preset

        Returns:
            Path: the path to the preset file
        """
        if preset_name not in self.presets:
            raise ValueError(
                f"Preset {preset_name} not found. Available presets: "
                f"{', '.join(self.presets.keys())}"
            )
        return self.presets[preset_name]

presets: Dict[str, Path] property

Get the available presets

Returns:

Type Description
Dict[str, Path]

Dict[str, Path]: The available presets

presets_dir: Path property

Get the presets directory

Returns:

Name Type Description
Path Path

The presets directory

presets_list: List[str] property

Get the list of available presets

Returns:

Type Description
List[str]

List[str]: The list of available presets

presets_table: Table property

Get a rich table with the available presets

Returns:

Name Type Description
Table Table

The rich table

__init__(presets_dir=None)

Initialize the preset inventory

Parameters:

Name Type Description Default
presets_dir Path

The path to the presets directory

None

Raises:

Type Description
ValueError

If the presets directory does not exist

Source code in cloudwatcher/preset.py
def __init__(self, presets_dir: Optional[Union[Path, str]] = None) -> None:
    """
    Initialize the preset inventory

    Args:
        presets_dir (Path): The path to the presets directory

    Raises:
        ValueError: If the presets directory does not exist
    """
    preset_dir = (
        Path(presets_dir)
        if presets_dir is not None
        else Path(__file__).parent / "presets"
    )
    if not preset_dir.exists():
        raise ValueError(f"Presets directory {preset_dir} does not exist")
    self._presets_dir = preset_dir
    _LOGGER.debug(f"Presets directory: {self.presets_dir}")
    self._presets = self._get_available_presets(self.presets_dir)

get_preset_path(preset_name)

Get the preset file content

Parameters:

Name Type Description Default
preset_name str

The name of the preset

required

Returns:

Name Type Description
Path Path

the path to the preset file

Source code in cloudwatcher/preset.py
def get_preset_path(self, preset_name: str) -> Path:
    """
    Get the preset file content

    Args:
        preset_name (str): The name of the preset

    Returns:
        Path: the path to the preset file
    """
    if preset_name not in self.presets:
        raise ValueError(
            f"Preset {preset_name} not found. Available presets: "
            f"{', '.join(self.presets.keys())}"
        )
    return self.presets[preset_name]

Metric handlers

ResponseHandler

Abstract class to establish the interface for a response handling

Source code in cloudwatcher/metric_handlers.py
class ResponseHandler:
    """
    Abstract class to establish the interface for a response handling
    """

    def __init__(self, response: dict) -> None:
        """
        Initialize the handler

        Args:
            response (dict): The response from the AWS API
        """
        self.response = response

__init__(response)

Initialize the handler

Parameters:

Name Type Description Default
response dict

The response from the AWS API

required
Source code in cloudwatcher/metric_handlers.py
def __init__(self, response: dict) -> None:
    """
    Initialize the handler

    Args:
        response (dict): The response from the AWS API
    """
    self.response = response

ResponseLogger

Bases: ResponseHandler

Log the response to the console

Source code in cloudwatcher/metric_handlers.py
class ResponseLogger(ResponseHandler):
    """
    Log the response to the console
    """

    def __call__(self, target: str) -> None:
        if target is not None:
            raise NotImplementedError(
                "Logging responses to a file is not yet implemented."
            )
        _LOGGER.debug(json.dumps(self.response, indent=4, default=str))

ResponseSaver

Bases: ResponseHandler

Save the response to a file

Source code in cloudwatcher/metric_handlers.py
class ResponseSaver(ResponseHandler):
    """
    Save the response to a file
    """

    def __call__(self, target: str) -> None:
        """
        Save the response to a file

        Args:
            target (str): The target file to save the response to
        """
        with open(target, "w") as f:
            json.dump(self.response, f, indent=4, default=str)
        _LOGGER.info(f"Saved response to: {target}")

__call__(target)

Save the response to a file

Parameters:

Name Type Description Default
target str

The target file to save the response to

required
Source code in cloudwatcher/metric_handlers.py
def __call__(self, target: str) -> None:
    """
    Save the response to a file

    Args:
        target (str): The target file to save the response to
    """
    with open(target, "w") as f:
        json.dump(self.response, f, indent=4, default=str)
    _LOGGER.info(f"Saved response to: {target}")

TimedMetric dataclass

Timed metric object

Parameters:

Name Type Description Default
timestamps List[datetime]

The timestamps of the metric

required
values List[float]

The values of the metric

required
label str

The label of the metric

required
Source code in cloudwatcher/metric_handlers.py
@dataclass
class TimedMetric:
    """
    Timed metric object

    Args:
        timestamps (List[datetime]): The timestamps of the metric
        values (List[float]): The values of the metric
        label (str): The label of the metric
    """

    label: str
    timestamps: List[datetime]
    values: List[float]

    def __len__(self):
        if len(self.timestamps) == len(self.values):
            return len(self.values)
        raise ValueError("The internal timed metric lengths are not equal")

TimedMetricCsvSaver

Bases: TimedMetricHandler

Source code in cloudwatcher/metric_handlers.py
class TimedMetricCsvSaver(TimedMetricHandler):
    def __call__(self, target: str) -> None:
        """
        Write the object to a csv file

        Args:
            target (str): The target file to save the object to
        """
        with open(target, "w", encoding="UTF8", newline="") as f:
            writer = csv.writer(f)

            # write the header
            writer.writerow(["time", "value"])
            # write the data
            for i in range(len(self.timed_metric)):
                writer.writerow(
                    [self.timed_metric.timestamps[i], self.timed_metric.values[i]]
                )
        _LOGGER.info(f"Saved '{self.timed_metric.label}' data to: {target}")

__call__(target)

Write the object to a csv file

Parameters:

Name Type Description Default
target str

The target file to save the object to

required
Source code in cloudwatcher/metric_handlers.py
def __call__(self, target: str) -> None:
    """
    Write the object to a csv file

    Args:
        target (str): The target file to save the object to
    """
    with open(target, "w", encoding="UTF8", newline="") as f:
        writer = csv.writer(f)

        # write the header
        writer.writerow(["time", "value"])
        # write the data
        for i in range(len(self.timed_metric)):
            writer.writerow(
                [self.timed_metric.timestamps[i], self.timed_metric.values[i]]
            )
    _LOGGER.info(f"Saved '{self.timed_metric.label}' data to: {target}")

TimedMetricHandler

Class to establish the interface for a timed metric handling

Source code in cloudwatcher/metric_handlers.py
class TimedMetricHandler:
    """
    Class to establish the interface for a timed metric handling
    """

    def __init__(self, timed_metric: TimedMetric) -> None:
        """
        Initialize the handler

        Args:
            timed_metric (TimedMetric): The timed metric to use
        """
        self.timed_metric = timed_metric

__init__(timed_metric)

Initialize the handler

Parameters:

Name Type Description Default
timed_metric TimedMetric

The timed metric to use

required
Source code in cloudwatcher/metric_handlers.py
def __init__(self, timed_metric: TimedMetric) -> None:
    """
    Initialize the handler

    Args:
        timed_metric (TimedMetric): The timed metric to use
    """
    self.timed_metric = timed_metric

TimedMetricJsonSaver

Bases: TimedMetricHandler

Source code in cloudwatcher/metric_handlers.py
class TimedMetricJsonSaver(TimedMetricHandler):
    def __call__(self, target: str) -> None:
        """
        Write the object to a json file

        Args:
            target (str): The target file to save the object to
        """
        with open(target, "w") as f:
            json.dump(
                {
                    "Label": self.timed_metric.label,
                    "Timestamps": self.timed_metric.timestamps,
                    "Values": self.timed_metric.values,
                },
                f,
                indent=4,
                default=str,
            )
        _LOGGER.info(f"Saved '{self.timed_metric.label}' data to: {target}")

__call__(target)

Write the object to a json file

Parameters:

Name Type Description Default
target str

The target file to save the object to

required
Source code in cloudwatcher/metric_handlers.py
def __call__(self, target: str) -> None:
    """
    Write the object to a json file

    Args:
        target (str): The target file to save the object to
    """
    with open(target, "w") as f:
        json.dump(
            {
                "Label": self.timed_metric.label,
                "Timestamps": self.timed_metric.timestamps,
                "Values": self.timed_metric.values,
            },
            f,
            indent=4,
            default=str,
        )
    _LOGGER.info(f"Saved '{self.timed_metric.label}' data to: {target}")

TimedMetricLogger

Bases: TimedMetricHandler

Source code in cloudwatcher/metric_handlers.py
class TimedMetricLogger(TimedMetricHandler):
    def __call__(self, target: str) -> None:
        """
        Log the timed metric as a table
        """
        if target is not None:
            raise NotImplementedError("Logging to a file is not yet implemented.")
        table = Table(show_header=True, header_style="bold magenta")
        table.add_column(f"Time ({str(pytz.utc)})", style="dim", justify="center")
        table.add_column("Value")
        values = [
            self.mem_to_str(v) if self.timed_metric.label.startswith("mem") else str(v)
            for v in self.timed_metric.values
        ]
        for i in range(len(self.timed_metric.timestamps)):
            table.add_row(
                self.timed_metric.timestamps[i].strftime("%H:%M:%S"), values[i]
            )
        console = Console()
        console.print(table)

    @staticmethod
    def mem_to_str(size: float, precision: int = 3) -> str:
        """
        Convert bytes to human readable string

        Args:
            size (int): The size in bytes
            precision (int): The precision to use, number of decimal places

        Returns:
            str: The human readable string
        """
        size, suffix = convert_mem(size)
        return "%.*f %s" % (precision, size, suffix)

__call__(target)

Log the timed metric as a table

Source code in cloudwatcher/metric_handlers.py
def __call__(self, target: str) -> None:
    """
    Log the timed metric as a table
    """
    if target is not None:
        raise NotImplementedError("Logging to a file is not yet implemented.")
    table = Table(show_header=True, header_style="bold magenta")
    table.add_column(f"Time ({str(pytz.utc)})", style="dim", justify="center")
    table.add_column("Value")
    values = [
        self.mem_to_str(v) if self.timed_metric.label.startswith("mem") else str(v)
        for v in self.timed_metric.values
    ]
    for i in range(len(self.timed_metric.timestamps)):
        table.add_row(
            self.timed_metric.timestamps[i].strftime("%H:%M:%S"), values[i]
        )
    console = Console()
    console.print(table)

mem_to_str(size, precision=3) staticmethod

Convert bytes to human readable string

Parameters:

Name Type Description Default
size int

The size in bytes

required
precision int

The precision to use, number of decimal places

3

Returns:

Name Type Description
str str

The human readable string

Source code in cloudwatcher/metric_handlers.py
@staticmethod
def mem_to_str(size: float, precision: int = 3) -> str:
    """
    Convert bytes to human readable string

    Args:
        size (int): The size in bytes
        precision (int): The precision to use, number of decimal places

    Returns:
        str: The human readable string
    """
    size, suffix = convert_mem(size)
    return "%.*f %s" % (precision, size, suffix)

TimedMetricPlotter

Bases: TimedMetricHandler

Source code in cloudwatcher/metric_handlers.py
class TimedMetricPlotter(TimedMetricHandler):

    def __call__(self, target: str, metric_unit: str) -> None:
        """
        Plot the timed metric

        Args:
            target (str): The target file to save the plot to
            metric_unit (str): The unit of the metric
        """
        values = self.timed_metric.values
        if self.timed_metric.label.startswith("mem") and metric_unit == "Bytes":
            metric_unit = "GB"
            values = [convert_mem(v, force_suffix=metric_unit)[0] for v in values]
        plt.figure()
        plt.plot(
            self.timed_metric.timestamps,
            values,
            linewidth=0.8,
        )
        plt.title(
            f"{self.timed_metric.label} over time",
            loc="right",
            fontstyle="italic",
        )
        plt.ylabel(f"{self.timed_metric.label} ({metric_unit})")
        plt.ticklabel_format(axis="y", style="plain", useOffset=False)
        plt.tick_params(left=True, bottom=False, labelleft=True, labelbottom=False)
        plt.savefig(
            target,
            bbox_inches="tight",
            pad_inches=0.1,
            dpi=300,
            format="png",
        )
        _LOGGER.info(f"Saved '{self.timed_metric.label}' plot to: {target}")

__call__(target, metric_unit)

Plot the timed metric

Parameters:

Name Type Description Default
target str

The target file to save the plot to

required
metric_unit str

The unit of the metric

required
Source code in cloudwatcher/metric_handlers.py
def __call__(self, target: str, metric_unit: str) -> None:
    """
    Plot the timed metric

    Args:
        target (str): The target file to save the plot to
        metric_unit (str): The unit of the metric
    """
    values = self.timed_metric.values
    if self.timed_metric.label.startswith("mem") and metric_unit == "Bytes":
        metric_unit = "GB"
        values = [convert_mem(v, force_suffix=metric_unit)[0] for v in values]
    plt.figure()
    plt.plot(
        self.timed_metric.timestamps,
        values,
        linewidth=0.8,
    )
    plt.title(
        f"{self.timed_metric.label} over time",
        loc="right",
        fontstyle="italic",
    )
    plt.ylabel(f"{self.timed_metric.label} ({metric_unit})")
    plt.ticklabel_format(axis="y", style="plain", useOffset=False)
    plt.tick_params(left=True, bottom=False, labelleft=True, labelbottom=False)
    plt.savefig(
        target,
        bbox_inches="tight",
        pad_inches=0.1,
        dpi=300,
        format="png",
    )
    _LOGGER.info(f"Saved '{self.timed_metric.label}' plot to: {target}")

TimedMetricSummarizer

Bases: TimedMetricHandler

Source code in cloudwatcher/metric_handlers.py
class TimedMetricSummarizer(TimedMetricHandler):
    def __call__(
        self,
        target: str,
        metric_unit: str,
        summarizer: Tuple[str, Callable],
    ) -> None:
        """
        Summarize the metric

        Args:
            target (str): The target file to save the summary to
            metric_unit (str): The unit of the metric
            summarizer (Tuple[str, callable]): The summarizer to use
                and the function to use
        """
        if target is not None:
            raise NotImplementedError("Logging to a file is not yet implemented.")
        timespan = self.timed_metric.timestamps[0] - self.timed_metric.timestamps[-1]
        _LOGGER.info(
            f"Retrieved '{self.timed_metric.label}' {len(self.timed_metric.values)} "
            f"measurements over {timespan} timespan"
        )
        summary = summarizer[1](self.timed_metric.values)
        if self.timed_metric.label.startswith("mem") and metric_unit == "Bytes":
            mem, metric_unit = convert_mem(summary)
            _LOGGER.info(
                f"{summarizer[0]} '{self.timed_metric.label}' is "
                f"{mem:.2f} {metric_unit} over {timespan} timespan"
            )
        else:
            _LOGGER.info(
                f"{summarizer[0]} '{self.timed_metric.label}' is "
                f"{summary} over {timespan} timespan"
            )

__call__(target, metric_unit, summarizer)

Summarize the metric

Parameters:

Name Type Description Default
target str

The target file to save the summary to

required
metric_unit str

The unit of the metric

required
summarizer Tuple[str, callable]

The summarizer to use and the function to use

required
Source code in cloudwatcher/metric_handlers.py
def __call__(
    self,
    target: str,
    metric_unit: str,
    summarizer: Tuple[str, Callable],
) -> None:
    """
    Summarize the metric

    Args:
        target (str): The target file to save the summary to
        metric_unit (str): The unit of the metric
        summarizer (Tuple[str, callable]): The summarizer to use
            and the function to use
    """
    if target is not None:
        raise NotImplementedError("Logging to a file is not yet implemented.")
    timespan = self.timed_metric.timestamps[0] - self.timed_metric.timestamps[-1]
    _LOGGER.info(
        f"Retrieved '{self.timed_metric.label}' {len(self.timed_metric.values)} "
        f"measurements over {timespan} timespan"
    )
    summary = summarizer[1](self.timed_metric.values)
    if self.timed_metric.label.startswith("mem") and metric_unit == "Bytes":
        mem, metric_unit = convert_mem(summary)
        _LOGGER.info(
            f"{summarizer[0]} '{self.timed_metric.label}' is "
            f"{mem:.2f} {metric_unit} over {timespan} timespan"
        )
    else:
        _LOGGER.info(
            f"{summarizer[0]} '{self.timed_metric.label}' is "
            f"{summary} over {timespan} timespan"
        )

convert_mem(value, force_suffix=None)

Convert memory in bytes to the highest possible, or desired memory unit

Parameters:

Name Type Description Default
value int

The memory in bytes

required
force_suffix str

The desired memory unit

None

Returns:

Type Description
Tuple[float, str]

Tuple[float, str]: The memory in the desired unit and the unit

Source code in cloudwatcher/metric_handlers.py
def convert_mem(value: float, force_suffix: Optional[str] = None) -> Tuple[float, str]:
    """
    Convert memory in bytes to the highest possible, or desired memory unit

    Args:
        value (int): The memory in bytes
        force_suffix (str): The desired memory unit

    Returns:
        Tuple[float, str]: The memory in the desired unit and the unit
    """
    suffixes = ["B", "KB", "MB", "GB", "TB"]
    if force_suffix is not None:
        try:
            idx = suffixes.index(force_suffix)
        except ValueError:
            raise ValueError(f"Forced memory unit must me one of: {suffixes}")
        else:
            return value / float(pow(1024, idx)), force_suffix
    suffixIndex = 0
    while value > 1024 and suffixIndex < len(suffixes) - 1:
        suffixIndex += 1
        value /= 1024.0
    return value, suffixes[suffixIndex]

LogEvent

Bases: BaseModel

A class for AWS CloudWatch log events

Attributes:

Name Type Description
message str

The log message

timestamp datetime

The log timestamp

Source code in cloudwatcher/logwatcher.py
class LogEvent(BaseModel):
    """
    A class for AWS CloudWatch log events

    Attributes:
        message (str): The log message
        timestamp (datetime): The log timestamp
    """

    message: str
    timestamp: datetime

    @classmethod
    def from_response(cls, response: Dict[str, Any]) -> "LogEvent":
        """
        Create a LogEvent object from a response

        Args:
            response (Dict[str, Any]): The response from AWS

        Returns:
            LogEvent: The LogEvent object
        """
        return cls(
            message=response["message"],
            timestamp=datetime.fromtimestamp(response["timestamp"] / 1000),
        )

    def format_message(
        self,
        regex: Optional[str] = None,
        fmt_str_log: Optional[str] = None,
        fmt_str_datetime: Optional[str] = None,
    ) -> "LogEvent":
        """
        Format the message by removing the embedded timestamp and adding a UTC timestamp

        Args:
            regex (str): regex to match the timestamp in the message
            fmt_str_log (str): format string for the log message
            fmt_str_datetime (str): format string for the datetime

        Returns:
            str: formatted message
        """
        regex = regex or r"^\[\d+-\d+-\d+\s\d+:\d+:\d+(.|,)\d+(\]|\s-\s\w+\])"
        fmt_str_log = fmt_str_log or "[{time} UTC] {message}"
        fmt_str_datetime = fmt_str_datetime or "%d-%m-%Y %H:%M:%S"
        m = re.search(regex, self.message)
        msg = self.message[m.end() :] if m else self.message
        formatted_message = fmt_str_log.format(
            time=self.timestamp.strftime(fmt_str_datetime), message=msg.strip()
        )
        return LogEvent(message=formatted_message, timestamp=self.timestamp)

    def __bool__(self) -> bool:
        """
        Return True if the message is not empty

        Returns:
            bool: True if the message is not empty
        """
        return bool(self.message)

__bool__()

Return True if the message is not empty

Returns:

Name Type Description
bool bool

True if the message is not empty

Source code in cloudwatcher/logwatcher.py
def __bool__(self) -> bool:
    """
    Return True if the message is not empty

    Returns:
        bool: True if the message is not empty
    """
    return bool(self.message)

format_message(regex=None, fmt_str_log=None, fmt_str_datetime=None)

Format the message by removing the embedded timestamp and adding a UTC timestamp

Parameters:

Name Type Description Default
regex str

regex to match the timestamp in the message

None
fmt_str_log str

format string for the log message

None
fmt_str_datetime str

format string for the datetime

None

Returns:

Name Type Description
str LogEvent

formatted message

Source code in cloudwatcher/logwatcher.py
def format_message(
    self,
    regex: Optional[str] = None,
    fmt_str_log: Optional[str] = None,
    fmt_str_datetime: Optional[str] = None,
) -> "LogEvent":
    """
    Format the message by removing the embedded timestamp and adding a UTC timestamp

    Args:
        regex (str): regex to match the timestamp in the message
        fmt_str_log (str): format string for the log message
        fmt_str_datetime (str): format string for the datetime

    Returns:
        str: formatted message
    """
    regex = regex or r"^\[\d+-\d+-\d+\s\d+:\d+:\d+(.|,)\d+(\]|\s-\s\w+\])"
    fmt_str_log = fmt_str_log or "[{time} UTC] {message}"
    fmt_str_datetime = fmt_str_datetime or "%d-%m-%Y %H:%M:%S"
    m = re.search(regex, self.message)
    msg = self.message[m.end() :] if m else self.message
    formatted_message = fmt_str_log.format(
        time=self.timestamp.strftime(fmt_str_datetime), message=msg.strip()
    )
    return LogEvent(message=formatted_message, timestamp=self.timestamp)

from_response(response) classmethod

Create a LogEvent object from a response

Parameters:

Name Type Description Default
response Dict[str, Any]

The response from AWS

required

Returns:

Name Type Description
LogEvent LogEvent

The LogEvent object

Source code in cloudwatcher/logwatcher.py
@classmethod
def from_response(cls, response: Dict[str, Any]) -> "LogEvent":
    """
    Create a LogEvent object from a response

    Args:
        response (Dict[str, Any]): The response from AWS

    Returns:
        LogEvent: The LogEvent object
    """
    return cls(
        message=response["message"],
        timestamp=datetime.fromtimestamp(response["timestamp"] / 1000),
    )

LogEventsList

Bases: BaseModel

A class for AWS CloudWatch log events list

Attributes:

Name Type Description
events List[LogEvent]

The list of log events

next_forward_token Optional[str]

The next forward token

next_backward_token Optional[str]

The next backward token

Source code in cloudwatcher/logwatcher.py
class LogEventsList(BaseModel):
    """
    A class for AWS CloudWatch log events list

    Attributes:
        events (List[LogEvent]): The list of log events
        next_forward_token (Optional[str]): The next forward token
        next_backward_token (Optional[str]): The next backward token
    """

    events: List[LogEvent]
    next_forward_token: Optional[str]
    next_backward_token: Optional[str]

    @classmethod
    def from_response(cls, response: Dict[str, Any]) -> "LogEventsList":
        """
        Create a LogEventsList object from a response

        Args:
            response (Dict[str, Any]): The response from AWS

        Returns:
            LogEventsList: The LogEventsList object
        """
        return cls(
            events=[LogEvent.from_response(event) for event in response["events"]],
            next_forward_token=response.get("nextForwardToken"),
            next_backward_token=response.get("nextBackwardToken"),
        )

    def format_messages(
        self,
        regex: Optional[str] = None,
        fmt_str_datetime: Optional[str] = None,
        fmt_str_log: Optional[str] = None,
    ) -> "LogEventsList":
        """
        Format the messages by removing the embedded timestamp
        and adding a UTC timestamp

        Args:
            regex (str): regex to match the timestamp in the message
            fmt_str_log (str): format string for the log message
            fmt_str_datetime (str): format string for the datetime

        Returns:
            LogEventsList: The LogEventsList object, with formatted messages
        """
        self.events = [
            event.format_message(
                regex=regex, fmt_str_datetime=fmt_str_datetime, fmt_str_log=fmt_str_log
            )
            for event in self.events
        ]
        return self

    def __bool__(self) -> bool:
        """
        Return True if the events list is not empty

        Returns:
            bool: True if the events list is not empty
        """
        return bool(self.events)

__bool__()

Return True if the events list is not empty

Returns:

Name Type Description
bool bool

True if the events list is not empty

Source code in cloudwatcher/logwatcher.py
def __bool__(self) -> bool:
    """
    Return True if the events list is not empty

    Returns:
        bool: True if the events list is not empty
    """
    return bool(self.events)

format_messages(regex=None, fmt_str_datetime=None, fmt_str_log=None)

Format the messages by removing the embedded timestamp and adding a UTC timestamp

Parameters:

Name Type Description Default
regex str

regex to match the timestamp in the message

None
fmt_str_log str

format string for the log message

None
fmt_str_datetime str

format string for the datetime

None

Returns:

Name Type Description
LogEventsList LogEventsList

The LogEventsList object, with formatted messages

Source code in cloudwatcher/logwatcher.py
def format_messages(
    self,
    regex: Optional[str] = None,
    fmt_str_datetime: Optional[str] = None,
    fmt_str_log: Optional[str] = None,
) -> "LogEventsList":
    """
    Format the messages by removing the embedded timestamp
    and adding a UTC timestamp

    Args:
        regex (str): regex to match the timestamp in the message
        fmt_str_log (str): format string for the log message
        fmt_str_datetime (str): format string for the datetime

    Returns:
        LogEventsList: The LogEventsList object, with formatted messages
    """
    self.events = [
        event.format_message(
            regex=regex, fmt_str_datetime=fmt_str_datetime, fmt_str_log=fmt_str_log
        )
        for event in self.events
    ]
    return self

from_response(response) classmethod

Create a LogEventsList object from a response

Parameters:

Name Type Description Default
response Dict[str, Any]

The response from AWS

required

Returns:

Name Type Description
LogEventsList LogEventsList

The LogEventsList object

Source code in cloudwatcher/logwatcher.py
@classmethod
def from_response(cls, response: Dict[str, Any]) -> "LogEventsList":
    """
    Create a LogEventsList object from a response

    Args:
        response (Dict[str, Any]): The response from AWS

    Returns:
        LogEventsList: The LogEventsList object
    """
    return cls(
        events=[LogEvent.from_response(event) for event in response["events"]],
        next_forward_token=response.get("nextForwardToken"),
        next_backward_token=response.get("nextBackwardToken"),
    )