garminexport/tests/test_retryer.py
2018-06-06 22:41:03 +02:00

179 lines
6.8 KiB
Python

from datetime import datetime
from datetime import timedelta
import logging
import time
import unittest
from garminexport.retryer import (
Retryer,
NoDelayStrategy, FixedDelayStrategy, ExponentialBackoffDelayStrategy,
SuppressAllErrorStrategy,
NeverStopStrategy
)
class Counter(object):
"""An object whose `next_value` method returns increasing values."""
def __init__(self, start_at=0):
self.nextval = start_at
def next_value(self):
current = self.nextval
self.nextval += 1
return current
class FailNTimesThenReturn(object):
"""An object whose `next_value` method fails N times and then, on the Nth
attempt, returns a value."""
def __init__(self, calls_until_success, returnval):
self.called = 0
self.calls_until_success = calls_until_success
self.returnval = returnval
def next_value(self):
self.called += 1
if self.called < self.calls_until_success:
raise RuntimeError("boom!")
return self.returnval
class TestRetryer(unittest.TestCase):
"""Exercise `Retryer`."""
def test_with_defaults(self):
"""Default `Retryer` behavior is to keep trying until a(ny) value is
returned."""
failing_client = FailNTimesThenReturn(10, "success!")
returnval = Retryer().call(failing_client.next_value)
self.assertEqual(returnval, "success!")
self.assertEqual(failing_client.called, 10)
def test_with_returnval_predicate(self):
"""`Retryer` should only return when the returnval_predicate says so."""
retryer = Retryer(returnval_predicate=lambda r: r == 20)
self.assertEqual(retryer.call(Counter().next_value), 20)
def test_function_with_positional_args(self):
"""`Retryer` should be able to call a function with positional args."""
# TODO
pass
def test_function_with_positional_and_kw_args(self):
"""`Retryer` should be able to call a function with keyword args."""
# TODO
pass
def test_bla(self):
retryer = Retryer()
func = lambda : int(time.time())
returnval = retryer.call(func)
print(returnval)
class TestFixedDelayStrategy(unittest.TestCase):
"""Exercise `FixedDelayStrategy`."""
def setUp(self):
# object under test
self.strategy = FixedDelayStrategy(timedelta(seconds=10))
def test_calculate_delay(self):
"""`FixedDelayStrategy` should always return the same delay."""
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=10))
self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=10))
class TestNoDelayStrategy(unittest.TestCase):
"""Exercise `NoDelayStrategy`."""
def setUp(self):
# object under test
self.strategy = NoDelayStrategy()
def test_calculate_delay(self):
"""`NoDelayStrategy` should always return no delay."""
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=0))
class TestExponentialBackoffDelayStrategy(unittest.TestCase):
"""Exercise `ExponentialBackoffDelayStrategy`."""
def setUp(self):
# object under test
self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=1))
def test_calculate_delay(self):
"""`ExponentialBackoffDelayStrategy` should return exponentially increasing delay."""
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=1))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=4))
self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=8))
self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=16))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=512))
def test_initial_delay(self):
"""The initial delay is used to scale the series of delays."""
self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=2))
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=2*1))
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2*2))
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=2*4))
self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=2*8))
self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=2*16))
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=2*512))
class TestSuppressAllErrorStrategy(unittest.TestCase):
"""Exercise `SuppressAllErrorStrategy`."""
def setUp(self):
# object under test
self.strategy = SuppressAllErrorStrategy()
def test_suppress(self):
"""`SuppressAllErrorStrategy` should always suppress."""
self.assertTrue(self.strategy.should_suppress(RuntimeError("boom!")))
self.assertTrue(self.strategy.should_suppress(Exception("boom!")))
# non-exception error
self.assertTrue(self.strategy.should_suppress("boom!"))
self.assertTrue(self.strategy.should_suppress(None))
class TestNeverStopStrategy(unittest.TestCase):
"""Exercise `NeverStopStrategy`"""
def setUp(self):
# object under test
self.strategy = NeverStopStrategy()
def test_suppress(self):
"""`SuppressAllErrorStrategy` should always suppress."""
self.assertTrue(self.strategy.should_continue(1, timedelta(seconds=1)))
self.assertTrue(self.strategy.should_continue(2, timedelta(seconds=4)))
self.assertTrue(self.strategy.should_continue(3, timedelta(seconds=4)))
self.assertTrue(self.strategy.should_continue(4, timedelta(seconds=5)))
self.assertTrue(self.strategy.should_continue(400, timedelta(hours=1)))
self.assertTrue(self.strategy.should_continue(4000, timedelta(hours=8)))
if __name__ == '__main__':
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG)
unittest.main()