add --max-retries option to support retrying failed downloads with exponential backoff

This commit is contained in:
petergardfjall 2017-03-07 19:42:27 +01:00
parent 8d62829e6d
commit d0aeabae6d
7 changed files with 451 additions and 25 deletions

View File

@ -8,3 +8,6 @@ init:
clean:
find -name '*~' -exec rm {} \;
find -name '*pyc' -exec rm {} \;
test:
nosetests --verbose --with-coverage --cover-package=garminexport --cover-branches

View File

@ -6,10 +6,13 @@ The backups are incremental, meaning that only activities that aren't already
stored in the backup directory will be downloaded.
"""
import argparse
from datetime import timedelta
import getpass
from garminexport.garminclient import GarminClient
import garminexport.backup
from garminexport.backup import export_formats
from garminexport.retryer import (
Retryer, ExponentialBackoffDelayStrategy, MaxRetriesStopStrategy)
import logging
import os
import re
@ -28,6 +31,8 @@ LOG_LEVELS = {
}
"""Command-line (string-based) log-level mapping to logging module levels."""
DEFAULT_MAX_RETRIES = 7
"""The default maximum number of retries to make when fetching a single activity."""
if __name__ == "__main__":
@ -59,6 +64,9 @@ if __name__ == "__main__":
parser.add_argument(
"-E", "--ignore-errors", action='store_true',
help="Ignore errors and keep going. Default: FALSE")
parser.add_argument(
"--max-retries", metavar="NUM", default=DEFAULT_MAX_RETRIES,
type=int, help="The maximum number of retries to make on failed attempts to fetch an activity. Exponential backoff will be used, meaning that the delay between successive attempts will double with every retry, starting at one second. DEFAULT: %d" % DEFAULT_MAX_RETRIES)
args = parser.parse_args()
if not args.log_level in LOG_LEVELS:
@ -77,10 +85,18 @@ if __name__ == "__main__":
if not args.password:
args.password = getpass.getpass("Enter password: ")
# set up a retryer that will handle retries of failed activity
# downloads
retryer = Retryer(
delay_strategy=ExponentialBackoffDelayStrategy(
initial_delay=timedelta(seconds=1)),
stop_strategy=MaxRetriesStopStrategy(args.max_retries))
with GarminClient(args.username, args.password) as client:
# get all activity ids and timestamps from Garmin account
log.info("scanning activities for %s ...", args.username)
activities = set(client.list_activities())
activities = set(retryer.call(client.list_activities))
log.info("account has a total of %d activities", len(activities))
missing_activities = garminexport.backup.need_backup(
@ -94,11 +110,11 @@ if __name__ == "__main__":
for index, activity in enumerate(missing_activities):
id, start = activity
log.info("backing up activity %d from %s (%d out of %d) ..." %
(id, start, index+1, len(missing_activities)))
log.info("backing up activity %d from %s (%d out of %d) ..." % (id, start, index+1, len(missing_activities)))
try:
garminexport.backup.download(
client, activity, args.backup_dir, args.format)
client, activity, retryer, args.backup_dir,
args.format)
except Exception as e:
log.error(u"failed with exception: %s", e)
if not args.ignore_errors:

View File

@ -36,7 +36,7 @@ def export_filename(activity, export_format):
to be exported to a given format. Exported files follow this pattern:
``<timestamp>_<activity_id>_<suffix>``.
For example: ``2015-02-17T05:45:00+00:00_123456789.tcx``
:param activity: An activity tuple `(id, starttime)`
:type activity: tuple of `(int, datetime)`
:param export_format: The export format (see :attr:`export_formats`)
@ -47,8 +47,8 @@ def export_filename(activity, export_format):
"""
fn = "{time}_{id}{suffix}".format(
id=activity[0],
time=activity[1].isoformat(),
suffix=format_suffix[export_format])
time=activity[1].isoformat(),
suffix=format_suffix[export_format])
return fn.replace(':','_') if os.name=='nt' else fn
@ -78,7 +78,7 @@ def need_backup(activities, backup_dir, export_formats=None):
need_backup.add(activity)
return need_backup
def _not_found_activities(backup_dir):
# consider all entries in <backup_dir>/.not_found as backed up
# (or rather, as tried but failed back ups)
@ -90,10 +90,10 @@ def _not_found_activities(backup_dir):
log.debug("%d tried but failed activities in %s",
len(failed_activities), _not_found)
return failed_activities
def download(client, activity, backup_dir, export_formats=None):
def download(client, activity, retryer, backup_dir, export_formats=None):
"""Exports a Garmin Connect activity to a given set of formats
and saves the resulting file(s) to a given backup directory.
In case a given format cannot be exported for the activity, the
@ -106,6 +106,9 @@ def download(client, activity, backup_dir, export_formats=None):
:type client: :class:`garminexport.garminclient.GarminClient`
:param activity: An activity tuple `(id, starttime)`
:type activity: tuple of `(int, datetime)`
:param retryer: A :class:`garminexport.retryer.Retryer` instance that
will handle failed download attempts.
:type retryer: :class:`garminexport.retryer.Retryer`
:param backup_dir: Backup directory path (assumed to exist already).
:type backup_dir: str
:keyword export_formats: Which format(s) to export to. Could be any
@ -113,19 +116,20 @@ def download(client, activity, backup_dir, export_formats=None):
:type export_formats: list of str
"""
id = activity[0]
if 'json_summary' in export_formats:
log.debug("getting json summary for %s", id)
activity_summary = client.get_activity_summary(id)
activity_summary = retryer.call(client.get_activity_summary, id)
dest = os.path.join(
backup_dir, export_filename(activity, 'json_summary'))
with codecs.open(dest, encoding="utf-8", mode="w") as f:
f.write(json.dumps(
activity_summary, ensure_ascii=False, indent=4))
if 'json_details' in export_formats:
log.debug("getting json details for %s", id)
activity_details = client.get_activity_details(id)
activity_details = retryer.call(client.get_activity_details, id)
dest = os.path.join(
backup_dir, export_filename(activity, 'json_details'))
with codecs.open(dest, encoding="utf-8", mode="w") as f:
@ -133,32 +137,32 @@ def download(client, activity, backup_dir, export_formats=None):
activity_details, ensure_ascii=False, indent=4))
not_found_path = os.path.join(backup_dir, not_found_file)
with open(not_found_path, mode="a") as not_found:
with open(not_found_path, mode="a") as not_found:
if 'gpx' in export_formats:
log.debug("getting gpx for %s", id)
activity_gpx = client.get_activity_gpx(id)
activity_gpx = retryer.call(client.get_activity_gpx, id)
dest = os.path.join(
backup_dir, export_filename(activity, 'gpx'))
backup_dir, export_filename(activity, 'gpx'))
if activity_gpx is None:
not_found.write(os.path.basename(dest) + "\n")
else:
with codecs.open(dest, encoding="utf-8", mode="w") as f:
f.write(activity_gpx)
if 'tcx' in export_formats:
log.debug("getting tcx for %s", id)
activity_tcx = client.get_activity_tcx(id)
activity_tcx = retryer.call(client.get_activity_tcx, id)
dest = os.path.join(
backup_dir, export_filename(activity, 'tcx'))
backup_dir, export_filename(activity, 'tcx'))
if activity_tcx is None:
not_found.write(os.path.basename(dest) + "\n")
else:
with codecs.open(dest, encoding="utf-8", mode="w") as f:
f.write(activity_tcx)
if 'fit' in export_formats:
log.debug("getting fit for %s", id)
activity_fit = client.get_activity_fit(id)
activity_fit = retryer.call(client.get_activity_fit, id)
dest = os.path.join(
backup_dir, export_filename(activity, 'fit'))
if activity_fit is None:
@ -166,5 +170,3 @@ def download(client, activity, backup_dir, export_formats=None):
else:
with open(dest, mode="wb") as f:
f.write(activity_fit)

223
garminexport/retryer.py Normal file
View File

@ -0,0 +1,223 @@
import abc
from datetime import datetime
from datetime import timedelta
import logging
import time
log = logging.getLogger(__name__)
class GaveUpError(Exception):
"""Raised by a :class:`Retryer` that has exceeded its maximum number
of retries."""
pass
class DelayStrategy(object):
"""Used by a :class:`Retryer` to determines how long to wait after an
attempt before the next retry. """
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def next_delay(self, attempts):
"""Returns the time to wait before the next attempt.
:param attempts: The total number of (failed) attempts performed thus
far.
:type attempts: int
:return: The delay before the next attempt.
:rtype: `timedelta`
"""
pass
class FixedDelayStrategy(DelayStrategy):
"""A retry :class:`DelayStrategy` that produces a fixed delay between
attempts."""
def __init__(self, delay):
"""
:param delay: Attempt delay.
:type delay: `timedelta`
"""
self.delay = delay
def next_delay(self, attempts):
return self.delay
class ExponentialBackoffDelayStrategy(DelayStrategy):
"""A retry :class:`DelayStrategy` that produces exponentially longer
delay between every attempt. The first attempt will be followed
by a `<initial-delay> * 2**0` delay. The following delays will be
`<initial-delay> * 2**1`, `<initial-delay> * 2**2`, and so on ...
"""
def __init__(self, initial_delay):
"""
:param initial_delay: Initial delay.
:type delay: `timedelta`
"""
self.initial_delay = initial_delay
def next_delay(self, attempts):
if attempts <= 0:
return timedelta(seconds=0)
delay_seconds = self.initial_delay.total_seconds() * 2 ** (attempts - 1)
return timedelta(seconds=delay_seconds)
class NoDelayStrategy(FixedDelayStrategy):
"""A retry :class:`DelayStrategy` that doesn't introduce any delay between
attempts."""
def __init__(self):
super(NoDelayStrategy, self).__init__(timedelta(seconds=0))
class ErrorStrategy(object):
"""Used by a :class:`Retryer` to determine which errors are to be
suppressed and which errors are to be re-raised and thereby end the
(re)trying."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def should_suppress(self, error):
"""Called after an attempt that raised an exception to determine if
that error should be suppressed (continue retrying) or be re-raised
(and end the retrying).
:param error: Error that was raised from an attempt.
"""
pass
class SuppressAllErrorStrategy(ErrorStrategy):
"""An :class:`ErrorStrategy` that suppresses all types of errors raised
on attempts to perform the call."""
def should_suppress(self, error):
return True
class StopStrategy(object):
"""Determines for how long a :class:`Retryer` should keep (re)trying."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def should_continue(self, attempts, elapsed_time):
"""Called after a failed attempt to determine if we should keep trying.
:param attempts: Total number of (failed) attempts thus far.
:type attempts: int
:param elapsed_time: Total elapsed time since first attempt.
:type elapsed_time: timedelta
:return: `True` if the `Retryer` should keep trying, `False` otherwise.
:rtype: bool
"""
pass
class NeverStopStrategy(StopStrategy):
"""A :class:`StopStrategy` that never gives up."""
def should_continue(self, attempts, elapsed_time):
return True
class MaxRetriesStopStrategy(StopStrategy):
"""A :class:`StopStrategy` that gives up after a certain number of
retries."""
def __init__(self, max_retries):
self.max_retries = max_retries
def should_continue(self, attempts, elapsed_time):
return attempts <= self.max_retries
class Retryer(object):
"""A :class:`Retryer` makes repeated calls to a function until either
the return value satisfies a certain condition (`returnval_predicate`)
or until a stop strategy (`stop_strategy`) determines that enough
attempts have been made (or a too long time has elapsed). Should the
`stop_strategy` decide to abort, a :class:`GaveUpError` is raised.
The delay between attempts is controlled by a `delay_strategy`.
Should the attempted call raise an Exception, an `error_strategy` gets
to decide if the error should be suppressed or re-raised (in which case
the retrying ends with that error).
"""
def __init__(
self,
returnval_predicate=lambda returnval: True,
delay_strategy=NoDelayStrategy(),
stop_strategy=NeverStopStrategy(),
error_strategy=SuppressAllErrorStrategy()):
"""Creates a new :class:`Retryer` set up to use a given set of
strategies to control its behavior.
With only default values, the retryer will keep retrying
indefinitely until a value (any value) is returned by
the called function. Any raised errors will be suppressed.
:param returnval_predicate: predicate that determines if a return
value is considered successful. When the predicate evaluates to
`True`, the `call` function will return with that return value.
:type returnval_predicate: `function(returnvalue) => bool`
:param delay_strategy: determines the time delay to introduce between
attempts.
:type delay_strategy: :class:`DelayStrategy`
:param stop_strategy: determines when we are to stop retrying.
:type stop_strategy: :class:`StopStrategy`
:param error_strategy: determines which errors (if any) to suppress
when raised by the called function.
:type error_strategy: :class:`ErrorStrategy`
"""
self.returnval_predicate = returnval_predicate
self.delay_strategy = delay_strategy
self.stop_strategy = stop_strategy
self.error_strategy = error_strategy
def call(self, function, *args, **kw):
"""Calls the given `function`, with the given arguments, repeatedly
until either (1) a satisfactory result is obtained (as indicated by
the `returnval_predicate`), or (2) until the `stop_strategy`
determines that no more attempts are to be made (results in a
`GaveUpException`), or (3) until the called function raises an error
that is not suppressed by the `error_strategy` (the call will raise
that error).
:param function: A `callable`.
:param args: Any positional arguments to call `function` with.
:param kw: Any keyword arguments to call `function` with.
"""
name = function.__name__
start = datetime.now()
attempts = 0
while True:
try:
attempts += 1
log.info('{%s}: attempt %d ...', name, attempts)
returnval = function(*args, **kw)
if self.returnval_predicate(returnval):
# return value satisfies predicate, we're done!
log.debug('{%s}: success: "%s"', name, returnval)
return returnval
log.debug('{%s}: failed: return value: %s', name, returnval)
except Exception as e:
if not self.error_strategy.should_suppress(e):
raise e
log.debug('{%s}: failed: error: %s', name, str(e))
elapsed_time = datetime.now() - start
# should we make another attempt?
if not self.stop_strategy.should_continue(attempts, elapsed_time):
raise GaveUpError(
'{%s}: gave up after %d failed attempt(s)' %
(name, attempts))
delay = self.delay_strategy.next_delay(attempts)
log.info('{%s}: waiting %d seconds for next attempt' %
(name, delay.total_seconds()))
time.sleep(delay.total_seconds())

View File

@ -1,2 +1,6 @@
requests==2.9.1
python-dateutil==2.4.1
nose==1.3.7
coverage==4.2
mock==2.0.0

0
tests/__init__.py Normal file
View File

178
tests/test_retryer.py Normal file
View File

@ -0,0 +1,178 @@
from datetime import datetime
from datetime import timedelta
import logging
import time
import unittest
from garminexport.retryer import (
Retryer,
NoDelayStrategy, FixedDelayStrategy, ExponentialBackoffDelayStrategy,
SuppressAllErrorStrategy,
NeverStopStrategy
)
class Counter(object):
"""An object whose `next_value` method returns increasing values."""
def __init__(self, start_at=0):
self.nextval = start_at
def next_value(self):
current = self.nextval
self.nextval += 1
return current
class FailNTimesThenReturn(object):
"""An object whose `next_value` method fails N times and then, on the Nth
attempt, returns a value."""
def __init__(self, calls_until_success, returnval):
self.called = 0
self.calls_until_success = calls_until_success
self.returnval = returnval
def next_value(self):
self.called += 1
if self.called < self.calls_until_success:
raise RuntimeError("boom!")
return self.returnval
class TestRetryer(unittest.TestCase):
"""Exercise `Retryer`."""
def test_with_defaults(self):
"""Default `Retryer` behavior is to keep trying until a(ny) value is
returned."""
failing_client = FailNTimesThenReturn(10, "success!")
returnval = Retryer().call(failing_client.next_value)
self.assertEqual(returnval, "success!")
self.assertEqual(failing_client.called, 10)
def test_with_returnval_predicate(self):
"""`Retryer` should only return when the returnval_predicate says so."""
retryer = Retryer(returnval_predicate=lambda r: r == 20)
self.assertEqual(retryer.call(Counter().next_value), 20)
def test_function_with_positional_args(self):
"""`Retryer` should be able to call a function with positional args."""
# TODO
pass
def test_function_with_positional_and_kw_args(self):
"""`Retryer` should be able to call a function with keyword args."""
# TODO
pass
def test_bla(self):
retryer = Retryer()
func = lambda : int(time.time())
returnval = retryer.call(func)
print returnval
class TestFixedDelayStrategy(unittest.TestCase):
"""Exercise `FixedDelayStrategy`."""
def setUp(self):
# object under test
self.strategy = FixedDelayStrategy(timedelta(seconds=10))
def test_calculate_delay(self):
"""`FixedDelayStrategy` should always return the same delay."""
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=10))
class TestNoDelayStrategy(unittest.TestCase):
"""Exercise `NoDelayStrategy`."""
def setUp(self):
# object under test
self.strategy = NoDelayStrategy()
def test_calculate_delay(self):
"""`NoDelayStrategy` should always return no delay."""
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=0))
class TestExponentialBackoffDelayStrategy(unittest.TestCase):
"""Exercise `ExponentialBackoffDelayStrategy`."""
def setUp(self):
# object under test
self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=1))
def test_calculate_delay(self):
"""`ExponentialBackoffDelayStrategy` should return exponentially increasing delay."""
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=1))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=4))
self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=8))
self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=16))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=512))
def test_initial_delay(self):
"""The initial delay is used to scale the series of delays."""
self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=2))
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=2*1))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2*2))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=2*4))
self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=2*8))
self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=2*16))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=2*512))
class TestSuppressAllErrorStrategy(unittest.TestCase):
"""Exercise `SuppressAllErrorStrategy`."""
def setUp(self):
# object under test
self.strategy = SuppressAllErrorStrategy()
def test_suppress(self):
"""`SuppressAllErrorStrategy` should always suppress."""
self.assertTrue(self.strategy.should_suppress(RuntimeError("boom!")))
self.assertTrue(self.strategy.should_suppress(Exception("boom!")))
# non-exception error
self.assertTrue(self.strategy.should_suppress("boom!"))
self.assertTrue(self.strategy.should_suppress(None))
class TestNeverStopStrategy(unittest.TestCase):
"""Exercise `NeverStopStrategy`"""
def setUp(self):
# object under test
self.strategy = NeverStopStrategy()
def test_suppress(self):
"""`SuppressAllErrorStrategy` should always suppress."""
self.assertTrue(self.strategy.should_continue(1, timedelta(seconds=1)))
self.assertTrue(self.strategy.should_continue(2, timedelta(seconds=4)))
self.assertTrue(self.strategy.should_continue(3, timedelta(seconds=4)))
self.assertTrue(self.strategy.should_continue(4, timedelta(seconds=5)))
self.assertTrue(self.strategy.should_continue(400, timedelta(hours=1)))
self.assertTrue(self.strategy.should_continue(4000, timedelta(hours=8)))
if __name__ == '__main__':
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG)
unittest.main()