diff --git a/octodns/source/envvar.py b/octodns/source/envvar.py new file mode 100644 index 0000000..b755632 --- /dev/null +++ b/octodns/source/envvar.py @@ -0,0 +1,105 @@ + +import logging +import os + +from ..record import Record +from .base import BaseSource + + +class EnvVarSourceException(Exception): + pass + + +class EnvironmentVariableNotFoundException(EnvVarSourceException): + def __init__(self, data): + super(EnvironmentVariableNotFoundException, self).__init__( + 'Unknown environment variable {}'.format(data)) + + +class EnvVarSource(BaseSource): + ''' + This source allows for environment variables to be embedded at octodns + execution time into zones. Intended to capture artifacts of deployment to + facilitate operational objectives. + + The TXT record generated will only have a single value. + + The record name cannot conflict with any other co-existing sources. If + this occurs, an exception will be thrown. + + Possible use cases include: + - Embedding a version number into a TXT record to monitor update + propagation across authoritative providers. + - Capturing identifying information about the deployment process to + record where and when the zone was updated. + + version: + class: octodns.source.envvar.EnvVarSource + # The environment variable in question, in this example the username + # currently executing octodns + variable: USER + # The TXT record name to embed the value found at the above + # environment variable + record: deployuser + # The TTL of the TXT record (optional, default 60) + ttl: 3600 + + This source is then combined with other sources in the octodns config + file: + + zones: + netflix.com.: + sources: + - yaml + - version + targets: + - ultra + - ns1 + ''' + SUPPORTS_GEO = False + SUPPORTS_DYNAMIC = False + SUPPORTS = set(('TXT')) + + DEFAULT_TTL = 60 + + def __init__(self, id, variable, record, ttl=DEFAULT_TTL): + self.log = logging.getLogger('{}[{}]'.format( + self.__class__.__name__, id)) + self.log.debug('__init__: id=%s, variable=%s, record=%s, ' + 'ttl=%d', id, variable, record, ttl) + super(EnvVarSource, self).__init__(id) + self.envvar = variable + self.record = record + self.ttl = ttl + self.value = None + + def _read_variable(self): + self.value = os.environ.get(self.envvar) + if self.value is None: + raise EnvironmentVariableNotFoundException(self.envvar) + + self.log.debug('_read_variable: successfully loaded var=%s val=%s', + self.envvar, self.value) + + def populate(self, zone, target=False, lenient=False): + self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name, + target, lenient) + + # if target: + # TODO: Environment Variable Source cannot act as a target, + # throw exception? + # return + + before = len(zone.records) + + self._read_variable() + + # We don't need to worry about conflicting records here because the + # manager will deconflict sources on our behalf. + payload = {'ttl': self.ttl, 'type': 'TXT', 'values': [self.value]} + record = Record.new(zone, self.record, payload, source=self, + lenient=lenient) + zone.add_record(record, lenient=lenient) + + self.log.info('populate: found %s records, exists=False', + len(zone.records) - before) diff --git a/tests/test_octodns_source_envvar.py b/tests/test_octodns_source_envvar.py new file mode 100644 index 0000000..e562aa0 --- /dev/null +++ b/tests/test_octodns_source_envvar.py @@ -0,0 +1,34 @@ +from six import text_type +from unittest import TestCase +from unittest.mock import patch + +from octodns.source.envvar import EnvVarSource +from octodns.source.envvar import EnvironmentVariableNotFoundException +from octodns.zone import Zone + + +class TestEnvVarSource(TestCase): + + def test_read_variable(self): + envvar = 'OCTODNS_TEST_ENVIRONMENT_VARIABLE' + source = EnvVarSource('testid', envvar, 'recordname', ttl=120) + with self.assertRaises(EnvironmentVariableNotFoundException) as ctx: + source._read_variable() + msg = 'Unknown environment variable {}'.format(envvar) + self.assertEquals(msg, text_type(ctx.exception)) + + with patch.dict('os.environ', {envvar: 'testvalue'}): + source._read_variable() + self.assertEquals(source.value, 'testvalue') + + def test_populate(self): + envvar = 'TEST_VAR' + value = 'somevalue' + record = 'testrecord' + source = EnvVarSource('testid', envvar, record) + zone = Zone('unit.tests.', []) + + with patch.dict('os.environ', {envvar: value}): + source.populate(zone) + + # TODO: Validate zone and record