Demo of Subprocess Env ManagerΒΆ

This is the original code.

This is the version based on argsloader.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import math
import platform
from enum import Enum, unique
from pprint import pprint

from argsloader.units import cdict, cvalue, number, enum, timespan, none, yesno, non_negative


@unique
class RetryType(Enum):
    RESET = 1
    RENEW = 2


@unique
class ContextType(Enum):
    SPAWN = 1
    FORK = 2

    @classmethod
    def default(cls):
        if 'win' in platform.system().lower():
            return cls.SPAWN
        else:
            return cls.FORK


config_loader = cdict(dict(
    episode_num=cvalue(math.inf, number()),  # should be a number
    max_retry=cvalue(5, number() >> non_negative.int()),  # should be an int number, >= 0
    step_timeout=cvalue(None, timespan() | none()),  # a timespan or None
    auto_reset=cvalue(True, yesno()),  # is boolean value
    retry_type=cvalue('reset', enum(RetryType)),  # retry type
    reset_timeout=cvalue(None, timespan() | none()),  # a timespan or None
    retry_waiting_time=cvalue(0.1, timespan()),  # a timespan

    # subprocess specified args
    shared_memory=cvalue(True, yesno()),  # should be a yes/no value
    context=cvalue(ContextType.default(), enum(ContextType)),  # context type
    wait_num=cvalue(2, number() >> non_negative.int()),  # should be an int number, >=0
    step_wait_timeout=cvalue(0.01, timespan()),  # a timespan in seconds
    connect_timeout=cvalue(60, timespan()),  # a timespan in seconds
    reset_inplace=cvalue(False, yesno()),  # should be a yes/no value
))

if __name__ == '__main__':
    pprint(config_loader.call({
        'connect_timeout': '3h 5min',
        'max_retry': '15',
        'retry_type': 'renew',
        'auto_reset': 'no',
    }), indent=4)

The result should be

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{   'auto_reset': False,
    'connect_timeout': 11100.0,
    'context': <ContextType.FORK: 2>,
    'episode_num': inf,
    'max_retry': 15,
    'reset_inplace': False,
    'reset_timeout': None,
    'retry_type': <RetryType.RENEW: 2>,
    'retry_waiting_time': 0.1,
    'shared_memory': True,
    'step_timeout': None,
    'step_wait_timeout': 0.01,
    'wait_num': 2}