179 lines
6.8 KiB
Python
179 lines
6.8 KiB
Python
|
from datetime import datetime
|
||
|
from datetime import timedelta
|
||
|
import logging
|
||
|
import time
|
||
|
import unittest
|
||
|
|
||
|
from garminexport.retryer import (
|
||
|
Retryer,
|
||
|
NoDelayStrategy, FixedDelayStrategy, ExponentialBackoffDelayStrategy,
|
||
|
SuppressAllErrorStrategy,
|
||
|
NeverStopStrategy
|
||
|
)
|
||
|
|
||
|
class Counter(object):
|
||
|
"""An object whose `next_value` method returns increasing values."""
|
||
|
|
||
|
def __init__(self, start_at=0):
|
||
|
self.nextval = start_at
|
||
|
|
||
|
def next_value(self):
|
||
|
current = self.nextval
|
||
|
self.nextval += 1
|
||
|
return current
|
||
|
|
||
|
|
||
|
class FailNTimesThenReturn(object):
|
||
|
"""An object whose `next_value` method fails N times and then, on the Nth
|
||
|
attempt, returns a value."""
|
||
|
|
||
|
def __init__(self, calls_until_success, returnval):
|
||
|
self.called = 0
|
||
|
self.calls_until_success = calls_until_success
|
||
|
self.returnval = returnval
|
||
|
|
||
|
def next_value(self):
|
||
|
self.called += 1
|
||
|
if self.called < self.calls_until_success:
|
||
|
raise RuntimeError("boom!")
|
||
|
return self.returnval
|
||
|
|
||
|
|
||
|
|
||
|
class TestRetryer(unittest.TestCase):
|
||
|
"""Exercise `Retryer`."""
|
||
|
|
||
|
|
||
|
def test_with_defaults(self):
|
||
|
"""Default `Retryer` behavior is to keep trying until a(ny) value is
|
||
|
returned."""
|
||
|
failing_client = FailNTimesThenReturn(10, "success!")
|
||
|
returnval = Retryer().call(failing_client.next_value)
|
||
|
self.assertEqual(returnval, "success!")
|
||
|
self.assertEqual(failing_client.called, 10)
|
||
|
|
||
|
|
||
|
def test_with_returnval_predicate(self):
|
||
|
"""`Retryer` should only return when the returnval_predicate says so."""
|
||
|
retryer = Retryer(returnval_predicate=lambda r: r == 20)
|
||
|
self.assertEqual(retryer.call(Counter().next_value), 20)
|
||
|
|
||
|
def test_function_with_positional_args(self):
|
||
|
"""`Retryer` should be able to call a function with positional args."""
|
||
|
# TODO
|
||
|
pass
|
||
|
|
||
|
def test_function_with_positional_and_kw_args(self):
|
||
|
"""`Retryer` should be able to call a function with keyword args."""
|
||
|
# TODO
|
||
|
pass
|
||
|
|
||
|
|
||
|
def test_bla(self):
|
||
|
retryer = Retryer()
|
||
|
func = lambda : int(time.time())
|
||
|
|
||
|
returnval = retryer.call(func)
|
||
|
print returnval
|
||
|
|
||
|
|
||
|
class TestFixedDelayStrategy(unittest.TestCase):
|
||
|
"""Exercise `FixedDelayStrategy`."""
|
||
|
|
||
|
def setUp(self):
|
||
|
# object under test
|
||
|
self.strategy = FixedDelayStrategy(timedelta(seconds=10))
|
||
|
|
||
|
def test_calculate_delay(self):
|
||
|
"""`FixedDelayStrategy` should always return the same delay."""
|
||
|
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=10))
|
||
|
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=10))
|
||
|
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=10))
|
||
|
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=10))
|
||
|
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=10))
|
||
|
self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=10))
|
||
|
|
||
|
|
||
|
class TestNoDelayStrategy(unittest.TestCase):
|
||
|
"""Exercise `NoDelayStrategy`."""
|
||
|
|
||
|
def setUp(self):
|
||
|
# object under test
|
||
|
self.strategy = NoDelayStrategy()
|
||
|
|
||
|
def test_calculate_delay(self):
|
||
|
"""`NoDelayStrategy` should always return no delay."""
|
||
|
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
|
||
|
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=0))
|
||
|
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=0))
|
||
|
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=0))
|
||
|
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=0))
|
||
|
self.assertEqual(self.strategy.next_delay(100), timedelta(seconds=0))
|
||
|
|
||
|
|
||
|
class TestExponentialBackoffDelayStrategy(unittest.TestCase):
|
||
|
"""Exercise `ExponentialBackoffDelayStrategy`."""
|
||
|
|
||
|
def setUp(self):
|
||
|
# object under test
|
||
|
self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=1))
|
||
|
|
||
|
def test_calculate_delay(self):
|
||
|
"""`ExponentialBackoffDelayStrategy` should return exponentially increasing delay."""
|
||
|
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
|
||
|
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=1))
|
||
|
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2))
|
||
|
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=4))
|
||
|
self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=8))
|
||
|
self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=16))
|
||
|
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=512))
|
||
|
|
||
|
def test_initial_delay(self):
|
||
|
"""The initial delay is used to scale the series of delays."""
|
||
|
self.strategy = ExponentialBackoffDelayStrategy(timedelta(seconds=2))
|
||
|
self.assertEqual(self.strategy.next_delay(0), timedelta(seconds=0))
|
||
|
self.assertEqual(self.strategy.next_delay(1), timedelta(seconds=2*1))
|
||
|
self.assertEqual(self.strategy.next_delay(2), timedelta(seconds=2*2))
|
||
|
self.assertEqual(self.strategy.next_delay(3), timedelta(seconds=2*4))
|
||
|
self.assertEqual(self.strategy.next_delay(4), timedelta(seconds=2*8))
|
||
|
self.assertEqual(self.strategy.next_delay(5), timedelta(seconds=2*16))
|
||
|
self.assertEqual(self.strategy.next_delay(10), timedelta(seconds=2*512))
|
||
|
|
||
|
|
||
|
class TestSuppressAllErrorStrategy(unittest.TestCase):
|
||
|
"""Exercise `SuppressAllErrorStrategy`."""
|
||
|
|
||
|
def setUp(self):
|
||
|
# object under test
|
||
|
self.strategy = SuppressAllErrorStrategy()
|
||
|
|
||
|
def test_suppress(self):
|
||
|
"""`SuppressAllErrorStrategy` should always suppress."""
|
||
|
self.assertTrue(self.strategy.should_suppress(RuntimeError("boom!")))
|
||
|
self.assertTrue(self.strategy.should_suppress(Exception("boom!")))
|
||
|
# non-exception error
|
||
|
self.assertTrue(self.strategy.should_suppress("boom!"))
|
||
|
self.assertTrue(self.strategy.should_suppress(None))
|
||
|
|
||
|
|
||
|
class TestNeverStopStrategy(unittest.TestCase):
|
||
|
"""Exercise `NeverStopStrategy`"""
|
||
|
|
||
|
def setUp(self):
|
||
|
# object under test
|
||
|
self.strategy = NeverStopStrategy()
|
||
|
|
||
|
def test_suppress(self):
|
||
|
"""`SuppressAllErrorStrategy` should always suppress."""
|
||
|
self.assertTrue(self.strategy.should_continue(1, timedelta(seconds=1)))
|
||
|
self.assertTrue(self.strategy.should_continue(2, timedelta(seconds=4)))
|
||
|
self.assertTrue(self.strategy.should_continue(3, timedelta(seconds=4)))
|
||
|
self.assertTrue(self.strategy.should_continue(4, timedelta(seconds=5)))
|
||
|
self.assertTrue(self.strategy.should_continue(400, timedelta(hours=1)))
|
||
|
self.assertTrue(self.strategy.should_continue(4000, timedelta(hours=8)))
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG)
|
||
|
|
||
|
unittest.main()
|