From d0aeabae6da7d07dde469eee7a7958323988ffe8 Mon Sep 17 00:00:00 2001 From: petergardfjall Date: Tue, 7 Mar 2017 19:42:27 +0100 Subject: [PATCH] add --max-retries option to support retrying failed downloads with exponential backoff --- Makefile | 3 + garminbackup.py | 24 ++++- garminexport/backup.py | 44 ++++---- garminexport/retryer.py | 223 ++++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 + tests/__init__.py | 0 tests/test_retryer.py | 178 ++++++++++++++++++++++++++++++++ 7 files changed, 451 insertions(+), 25 deletions(-) create mode 100644 garminexport/retryer.py create mode 100644 tests/__init__.py create mode 100644 tests/test_retryer.py diff --git a/Makefile b/Makefile index 6db4117..de61a00 100644 --- a/Makefile +++ b/Makefile @@ -8,3 +8,6 @@ init: clean: find -name '*~' -exec rm {} \; find -name '*pyc' -exec rm {} \; + +test: + nosetests --verbose --with-coverage --cover-package=garminexport --cover-branches diff --git a/garminbackup.py b/garminbackup.py index 576faa7..82ef3e0 100755 --- a/garminbackup.py +++ b/garminbackup.py @@ -6,10 +6,13 @@ The backups are incremental, meaning that only activities that aren't already stored in the backup directory will be downloaded. """ import argparse +from datetime import timedelta import getpass from garminexport.garminclient import GarminClient import garminexport.backup from garminexport.backup import export_formats +from garminexport.retryer import ( + Retryer, ExponentialBackoffDelayStrategy, MaxRetriesStopStrategy) import logging import os import re @@ -28,6 +31,8 @@ LOG_LEVELS = { } """Command-line (string-based) log-level mapping to logging module levels.""" +DEFAULT_MAX_RETRIES = 7 +"""The default maximum number of retries to make when fetching a single activity.""" if __name__ == "__main__": @@ -59,6 +64,9 @@ if __name__ == "__main__": parser.add_argument( "-E", "--ignore-errors", action='store_true', help="Ignore errors and keep going. Default: FALSE") + parser.add_argument( + "--max-retries", metavar="NUM", default=DEFAULT_MAX_RETRIES, + type=int, help="The maximum number of retries to make on failed attempts to fetch an activity. Exponential backoff will be used, meaning that the delay between successive attempts will double with every retry, starting at one second. DEFAULT: %d" % DEFAULT_MAX_RETRIES) args = parser.parse_args() if not args.log_level in LOG_LEVELS: @@ -77,10 +85,18 @@ if __name__ == "__main__": if not args.password: args.password = getpass.getpass("Enter password: ") + # set up a retryer that will handle retries of failed activity + # downloads + retryer = Retryer( + delay_strategy=ExponentialBackoffDelayStrategy( + initial_delay=timedelta(seconds=1)), + stop_strategy=MaxRetriesStopStrategy(args.max_retries)) + + with GarminClient(args.username, args.password) as client: # get all activity ids and timestamps from Garmin account log.info("scanning activities for %s ...", args.username) - activities = set(client.list_activities()) + activities = set(retryer.call(client.list_activities)) log.info("account has a total of %d activities", len(activities)) missing_activities = garminexport.backup.need_backup( @@ -94,11 +110,11 @@ if __name__ == "__main__": for index, activity in enumerate(missing_activities): id, start = activity - log.info("backing up activity %d from %s (%d out of %d) ..." % - (id, start, index+1, len(missing_activities))) + log.info("backing up activity %d from %s (%d out of %d) ..." % (id, start, index+1, len(missing_activities))) try: garminexport.backup.download( - client, activity, args.backup_dir, args.format) + client, activity, retryer, args.backup_dir, + args.format) except Exception as e: log.error(u"failed with exception: %s", e) if not args.ignore_errors: diff --git a/garminexport/backup.py b/garminexport/backup.py index b053c79..18b49ee 100644 --- a/garminexport/backup.py +++ b/garminexport/backup.py @@ -36,7 +36,7 @@ def export_filename(activity, export_format): to be exported to a given format. Exported files follow this pattern: ``__``. For example: ``2015-02-17T05:45:00+00:00_123456789.tcx`` - + :param activity: An activity tuple `(id, starttime)` :type activity: tuple of `(int, datetime)` :param export_format: The export format (see :attr:`export_formats`) @@ -47,8 +47,8 @@ def export_filename(activity, export_format): """ fn = "{time}_{id}{suffix}".format( id=activity[0], - time=activity[1].isoformat(), - suffix=format_suffix[export_format]) + time=activity[1].isoformat(), + suffix=format_suffix[export_format]) return fn.replace(':','_') if os.name=='nt' else fn @@ -78,7 +78,7 @@ def need_backup(activities, backup_dir, export_formats=None): need_backup.add(activity) return need_backup - + def _not_found_activities(backup_dir): # consider all entries in /.not_found as backed up # (or rather, as tried but failed back ups) @@ -90,10 +90,10 @@ def _not_found_activities(backup_dir): log.debug("%d tried but failed activities in %s", len(failed_activities), _not_found) return failed_activities - - -def download(client, activity, backup_dir, export_formats=None): + + +def download(client, activity, retryer, backup_dir, export_formats=None): """Exports a Garmin Connect activity to a given set of formats and saves the resulting file(s) to a given backup directory. In case a given format cannot be exported for the activity, the @@ -106,6 +106,9 @@ def download(client, activity, backup_dir, export_formats=None): :type client: :class:`garminexport.garminclient.GarminClient` :param activity: An activity tuple `(id, starttime)` :type activity: tuple of `(int, datetime)` + :param retryer: A :class:`garminexport.retryer.Retryer` instance that + will handle failed download attempts. + :type retryer: :class:`garminexport.retryer.Retryer` :param backup_dir: Backup directory path (assumed to exist already). :type backup_dir: str :keyword export_formats: Which format(s) to export to. Could be any @@ -113,19 +116,20 @@ def download(client, activity, backup_dir, export_formats=None): :type export_formats: list of str """ id = activity[0] - + if 'json_summary' in export_formats: log.debug("getting json summary for %s", id) - activity_summary = client.get_activity_summary(id) + + activity_summary = retryer.call(client.get_activity_summary, id) dest = os.path.join( backup_dir, export_filename(activity, 'json_summary')) with codecs.open(dest, encoding="utf-8", mode="w") as f: f.write(json.dumps( activity_summary, ensure_ascii=False, indent=4)) - + if 'json_details' in export_formats: log.debug("getting json details for %s", id) - activity_details = client.get_activity_details(id) + activity_details = retryer.call(client.get_activity_details, id) dest = os.path.join( backup_dir, export_filename(activity, 'json_details')) with codecs.open(dest, encoding="utf-8", mode="w") as f: @@ -133,32 +137,32 @@ def download(client, activity, backup_dir, export_formats=None): activity_details, ensure_ascii=False, indent=4)) not_found_path = os.path.join(backup_dir, not_found_file) - with open(not_found_path, mode="a") as not_found: + with open(not_found_path, mode="a") as not_found: if 'gpx' in export_formats: log.debug("getting gpx for %s", id) - activity_gpx = client.get_activity_gpx(id) + activity_gpx = retryer.call(client.get_activity_gpx, id) dest = os.path.join( - backup_dir, export_filename(activity, 'gpx')) + backup_dir, export_filename(activity, 'gpx')) if activity_gpx is None: not_found.write(os.path.basename(dest) + "\n") else: with codecs.open(dest, encoding="utf-8", mode="w") as f: f.write(activity_gpx) - + if 'tcx' in export_formats: log.debug("getting tcx for %s", id) - activity_tcx = client.get_activity_tcx(id) + activity_tcx = retryer.call(client.get_activity_tcx, id) dest = os.path.join( - backup_dir, export_filename(activity, 'tcx')) + backup_dir, export_filename(activity, 'tcx')) if activity_tcx is None: not_found.write(os.path.basename(dest) + "\n") else: with codecs.open(dest, encoding="utf-8", mode="w") as f: f.write(activity_tcx) - + if 'fit' in export_formats: log.debug("getting fit for %s", id) - activity_fit = client.get_activity_fit(id) + activity_fit = retryer.call(client.get_activity_fit, id) dest = os.path.join( backup_dir, export_filename(activity, 'fit')) if activity_fit is None: @@ -166,5 +170,3 @@ def download(client, activity, backup_dir, export_formats=None): else: with open(dest, mode="wb") as f: f.write(activity_fit) - - diff --git a/garminexport/retryer.py b/garminexport/retryer.py new file mode 100644 index 0000000..df98218 --- /dev/null +++ b/garminexport/retryer.py @@ -0,0 +1,223 @@ +import abc + +from datetime import datetime +from datetime import timedelta +import logging +import time + +log = logging.getLogger(__name__) + +class GaveUpError(Exception): + """Raised by a :class:`Retryer` that has exceeded its maximum number + of retries.""" + pass + + +class DelayStrategy(object): + """Used by a :class:`Retryer` to determines how long to wait after an + attempt before the next retry. """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def next_delay(self, attempts): + """Returns the time to wait before the next attempt. + + :param attempts: The total number of (failed) attempts performed thus + far. + :type attempts: int + + :return: The delay before the next attempt. + :rtype: `timedelta` + """ + pass + + +class FixedDelayStrategy(DelayStrategy): + """A retry :class:`DelayStrategy` that produces a fixed delay between + attempts.""" + def __init__(self, delay): + """ + :param delay: Attempt delay. + :type delay: `timedelta` + """ + self.delay = delay + + def next_delay(self, attempts): + return self.delay + + +class ExponentialBackoffDelayStrategy(DelayStrategy): + """A retry :class:`DelayStrategy` that produces exponentially longer + delay between every attempt. The first attempt will be followed + by a ` * 2**0` delay. The following delays will be + ` * 2**1`, ` * 2**2`, and so on ... + """ + + def __init__(self, initial_delay): + """ + :param initial_delay: Initial delay. + :type delay: `timedelta` + """ + self.initial_delay = initial_delay + + def next_delay(self, attempts): + if attempts <= 0: + return timedelta(seconds=0) + delay_seconds = self.initial_delay.total_seconds() * 2 ** (attempts - 1) + return timedelta(seconds=delay_seconds) + + +class NoDelayStrategy(FixedDelayStrategy): + """A retry :class:`DelayStrategy` that doesn't introduce any delay between + attempts.""" + def __init__(self): + super(NoDelayStrategy, self).__init__(timedelta(seconds=0)) + + + + +class ErrorStrategy(object): + """Used by a :class:`Retryer` to determine which errors are to be + suppressed and which errors are to be re-raised and thereby end the + (re)trying.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def should_suppress(self, error): + """Called after an attempt that raised an exception to determine if + that error should be suppressed (continue retrying) or be re-raised + (and end the retrying). + + :param error: Error that was raised from an attempt. + """ + pass + + +class SuppressAllErrorStrategy(ErrorStrategy): + """An :class:`ErrorStrategy` that suppresses all types of errors raised + on attempts to perform the call.""" + + def should_suppress(self, error): + return True + + +class StopStrategy(object): + """Determines for how long a :class:`Retryer` should keep (re)trying.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def should_continue(self, attempts, elapsed_time): + """Called after a failed attempt to determine if we should keep trying. + + :param attempts: Total number of (failed) attempts thus far. + :type attempts: int + :param elapsed_time: Total elapsed time since first attempt. + :type elapsed_time: timedelta + + :return: `True` if the `Retryer` should keep trying, `False` otherwise. + :rtype: bool + """ + pass + + +class NeverStopStrategy(StopStrategy): + """A :class:`StopStrategy` that never gives up.""" + def should_continue(self, attempts, elapsed_time): + return True + + +class MaxRetriesStopStrategy(StopStrategy): + """A :class:`StopStrategy` that gives up after a certain number of + retries.""" + def __init__(self, max_retries): + self.max_retries = max_retries + + def should_continue(self, attempts, elapsed_time): + return attempts <= self.max_retries + + +class Retryer(object): + """A :class:`Retryer` makes repeated calls to a function until either + the return value satisfies a certain condition (`returnval_predicate`) + or until a stop strategy (`stop_strategy`) determines that enough + attempts have been made (or a too long time has elapsed). Should the + `stop_strategy` decide to abort, a :class:`GaveUpError` is raised. + + The delay between attempts is controlled by a `delay_strategy`. + + Should the attempted call raise an Exception, an `error_strategy` gets + to decide if the error should be suppressed or re-raised (in which case + the retrying ends with that error). + """ + def __init__( + self, + returnval_predicate=lambda returnval: True, + delay_strategy=NoDelayStrategy(), + stop_strategy=NeverStopStrategy(), + error_strategy=SuppressAllErrorStrategy()): + """Creates a new :class:`Retryer` set up to use a given set of + strategies to control its behavior. + + With only default values, the retryer will keep retrying + indefinitely until a value (any value) is returned by + the called function. Any raised errors will be suppressed. + + :param returnval_predicate: predicate that determines if a return + value is considered successful. When the predicate evaluates to + `True`, the `call` function will return with that return value. + :type returnval_predicate: `function(returnvalue) => bool` + :param delay_strategy: determines the time delay to introduce between + attempts. + :type delay_strategy: :class:`DelayStrategy` + :param stop_strategy: determines when we are to stop retrying. + :type stop_strategy: :class:`StopStrategy` + :param error_strategy: determines which errors (if any) to suppress + when raised by the called function. + :type error_strategy: :class:`ErrorStrategy` + """ + self.returnval_predicate = returnval_predicate + self.delay_strategy = delay_strategy + self.stop_strategy = stop_strategy + self.error_strategy = error_strategy + + + def call(self, function, *args, **kw): + """Calls the given `function`, with the given arguments, repeatedly + until either (1) a satisfactory result is obtained (as indicated by + the `returnval_predicate`), or (2) until the `stop_strategy` + determines that no more attempts are to be made (results in a + `GaveUpException`), or (3) until the called function raises an error + that is not suppressed by the `error_strategy` (the call will raise + that error). + + :param function: A `callable`. + :param args: Any positional arguments to call `function` with. + :param kw: Any keyword arguments to call `function` with. + """ + name = function.__name__ + start = datetime.now() + attempts = 0 + while True: + try: + attempts += 1 + log.info('{%s}: attempt %d ...', name, attempts) + returnval = function(*args, **kw) + if self.returnval_predicate(returnval): + # return value satisfies predicate, we're done! + log.debug('{%s}: success: "%s"', name, returnval) + return returnval + log.debug('{%s}: failed: return value: %s', name, returnval) + except Exception as e: + if not self.error_strategy.should_suppress(e): + raise e + log.debug('{%s}: failed: error: %s', name, str(e)) + elapsed_time = datetime.now() - start + # should we make another attempt? + if not self.stop_strategy.should_continue(attempts, elapsed_time): + raise GaveUpError( + '{%s}: gave up after %d failed attempt(s)' % + (name, attempts)) + delay = self.delay_strategy.next_delay(attempts) + log.info('{%s}: waiting %d seconds for next attempt' % + (name, delay.total_seconds())) + time.sleep(delay.total_seconds()) diff --git a/requirements.txt b/requirements.txt index de51fd6..2a57eaf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,6 @@ requests==2.9.1 python-dateutil==2.4.1 + +nose==1.3.7 +coverage==4.2 +mock==2.0.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_retryer.py b/tests/test_retryer.py new file mode 100644 index 0000000..b3720fd --- /dev/null +++ b/tests/test_retryer.py @@ -0,0 +1,178 @@ +from datetime import datetime +from datetime import timedelta +import logging +import time +import unittest + +from garminexport.retryer import ( + Retryer, + NoDelayStrategy, FixedDelayStrategy, ExponentialBackoffDelayStrategy, + SuppressAllErrorStrategy, + NeverStopStrategy +) + +class Counter(object): + """An object whose `next_value` method returns increasing values.""" + + def __init__(self, start_at=0): + self.nextval = start_at + + def next_value(self): + current = self.nextval + self.nextval += 1 + return current + + +class FailNTimesThenReturn(object): + """An object whose `next_value` method fails N times and then, on the Nth + attempt, returns a value.""" + + def __init__(self, calls_until_success, returnval): + self.called = 0 + self.calls_until_success = calls_until_success + self.returnval = returnval + + def next_value(self): + self.called += 1 + if self.called < self.calls_until_success: + raise RuntimeError("boom!") + return self.returnval + + + +class TestRetryer(unittest.TestCase): + """Exercise `Retryer`.""" + + + def test_with_defaults(self): + """Default `Retryer` behavior is to keep trying until a(ny) value is + returned.""" + failing_client = FailNTimesThenReturn(10, "success!") + returnval = Retryer().call(failing_client.next_value) + self.assertEqual(returnval, "success!") + self.assertEqual(failing_client.called, 10) + + + def test_with_returnval_predicate(self): + """`Retryer` should only return when the returnval_predicate says so.""" + retryer = Retryer(returnval_predicate=lambda r: r == 20) + self.assertEqual(retryer.call(Counter().next_value), 20) + + def test_function_with_positional_args(self): + """`Retryer` should be able to call a function with positional args.""" + # TODO + pass + + def test_function_with_positional_and_kw_args(self): + """`Retryer` should be able to call a function with keyword args.""" + # TODO + pass + + + def test_bla(self): + retryer = Retryer() + func = lambda : int(time.time()) + + returnval = retryer.call(func) + print returnval + + +class TestFixedDelayStrategy(unittest.TestCase): + """Exercise `FixedDelayStrategy`.""" + + def setUp(self): + # object under test + self.strategy = FixedDelayStrategy(timedelta(seconds=10)) + + def test_calculate_delay(self): + """`FixedDelayStrategy` should always return the same delay.""" + self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=10)) + self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=10)) + self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=10)) + self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=10)) + self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=10)) + self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=10)) + + +class TestNoDelayStrategy(unittest.TestCase): + """Exercise `NoDelayStrategy`.""" + + def setUp(self): + # object under test + self.strategy = NoDelayStrategy() + + def test_calculate_delay(self): + """`NoDelayStrategy` should always return no delay.""" + self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0)) + self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=0)) + self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=0)) + self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=0)) + self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=0)) + self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=0)) + + +class TestExponentialBackoffDelayStrategy(unittest.TestCase): + """Exercise `ExponentialBackoffDelayStrategy`.""" + + def setUp(self): + # object under test + self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=1)) + + def test_calculate_delay(self): + """`ExponentialBackoffDelayStrategy` should return exponentially increasing delay.""" + self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0)) + self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=1)) + self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2)) + self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=4)) + self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=8)) + self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=16)) + self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=512)) + + def test_initial_delay(self): + """The initial delay is used to scale the series of delays.""" + self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=2)) + self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0)) + self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=2*1)) + self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2*2)) + self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=2*4)) + self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=2*8)) + self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=2*16)) + self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=2*512)) + + +class TestSuppressAllErrorStrategy(unittest.TestCase): + """Exercise `SuppressAllErrorStrategy`.""" + + def setUp(self): + # object under test + self.strategy = SuppressAllErrorStrategy() + + def test_suppress(self): + """`SuppressAllErrorStrategy` should always suppress.""" + self.assertTrue(self.strategy.should_suppress(RuntimeError("boom!"))) + self.assertTrue(self.strategy.should_suppress(Exception("boom!"))) + # non-exception error + self.assertTrue(self.strategy.should_suppress("boom!")) + self.assertTrue(self.strategy.should_suppress(None)) + + +class TestNeverStopStrategy(unittest.TestCase): + """Exercise `NeverStopStrategy`""" + + def setUp(self): + # object under test + self.strategy = NeverStopStrategy() + + def test_suppress(self): + """`SuppressAllErrorStrategy` should always suppress.""" + self.assertTrue(self.strategy.should_continue(1, timedelta(seconds=1))) + self.assertTrue(self.strategy.should_continue(2, timedelta(seconds=4))) + self.assertTrue(self.strategy.should_continue(3, timedelta(seconds=4))) + self.assertTrue(self.strategy.should_continue(4, timedelta(seconds=5))) + self.assertTrue(self.strategy.should_continue(400, timedelta(hours=1))) + self.assertTrue(self.strategy.should_continue(4000, timedelta(hours=8))) + +if __name__ == '__main__': + logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG) + + unittest.main()