Loading Dockerfile 0 → 100644 +26 −0 Original line number Diff line number Diff line ARG base_image FROM ${base_image:-debian:buster} LABEL uk.org.kodo.maintainer="Dom Sekotill <dom.sekotill@kodo.org.uk>" ARG upstream_version RUN version=${upstream_version:+=$upstream_version} \ && apt-get update \ && apt-get install -y \ python3 \ python3-jinja2 \ python3-netifaces \ nginx${version} \ && rm -r /etc/nginx/nginx.conf /etc/nginx/sites-* \ && mkdir /etc/nginx/streams \ && ln -sf /dev/stdout /var/log/nginx/access.log \ && ln -sf /dev/stderr /var/log/nginx/error.log \ &&: COPY --from=lachlanevenson/k8s-kubectl:latest \ /usr/local/bin/kubectl /usr/bin/kubectl ADD monitor.py /usr/bin/monitor ADD templates /usr/share/nginx/templates ENTRYPOINT ["/usr/bin/env"] CMD ["/usr/bin/monitor"] build.sh 0 → 100644 +21 −0 Original line number Diff line number Diff line #!/bin/bash : ${PROXY_BASE_IMAGE:=debian:buster} get_latest() { docker run --rm ${PROXY_BASE_IMAGE} \ sh -c 'apt-get update >/dev/null && apt-cache show nginx' | sed -n '1,/Version:/ s/Version: //p' } get_version() { echo "${CI_COMMIT_SHORT_SHA}" } build() { docker_build \ --build-arg base_image="${PROXY_BASE_IMAGE}" \ --build-arg upstream_version="${UPSTREAM_VERSION}" \ --tag $1 } monitor.py 0 → 100755 +304 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 import argparse import collections import contextlib import hashlib import io import ipaddress import json import logging import netifaces import os import pathlib import signal import socket import subprocess from urllib import parse from typing import Iterable, Set, TextIO, Union import jinja2 NGINX_BASE = '/etc/nginx' IPAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address] logger = logging.getLogger(__name__) class StreamConfig: """ Manage an Nginx stream configuration """ def __init__(self, name, protocol, external_port, internal_port): self.name = name self.path = None self.vals = dict( protocol=protocol, external_port=external_port, internal_port=internal_port, ) def enable(self, engine, nginx_base=NGINX_BASE): """ Add the stream config """ self.path = pathlib.Path(nginx_base).joinpath(f'streams/{self.name}.conf') template = engine.get_template('stream.conf') with self.path.open('w') as conf: conf.writelines(template.stream(**self.vals)) def disable(self): """ Remove the stream config """ if self.path.exists(): self.path.unlink() def add_core_config(engine, opts, nginx_dir=NGINX_BASE): """ Add a minimal Nginx configuration """ template = engine.get_template('nginx.conf') path = pathlib.Path(nginx_dir).joinpath('nginx.conf') with path.open('w') as conf: conf.writelines(template.stream(**opts.__dict__)) def parse_cmdline(argv=None): parser = argparse.ArgumentParser() parser.add_argument('--monitor-url', type=parse.urlparse, default=parse.urlparse('http://127.0.0.1/healthz'), ) parser.add_argument('--cert', '--certificate', type=pathlib.Path, ) parser.add_argument('--key', type=pathlib.Path, ) mutex = parser.add_mutually_exclusive_group() mutex.add_argument('--verbose', '-v', action='count', default=0, ) mutex.add_argument('--quiet', '-q', action='store_true', default=False, ) opts = parser.parse_args(argv) if opts.monitor_url.scheme == 'https': if not opts.cert: parser.error(f"{opts.monitor_url!r} requires TLS certificates with --cert and --key") elif not opts.cert.exists(): parser.error(f"{opts.cert} does not exist") elif opts.key and not opts.key.exists(): parser.error(f"{opts.key} does not exist") elif opts.cert or opts.key: parser.error("--cert and --key are not valid for {opts.monitor_url!r}") # Set verbosity logging.root.setLevel( logging.DEBUG if opts.verbose >= 1 else logging.INFO if not opts.quiet else logging.WARNING ) return opts def main(argv=None): opts = parse_cmdline(argv) # Template engine env = jinja2.Environment( loader=jinja2.FileSystemLoader('/usr/share/nginx/templates'), keep_trailing_newline=True, # It's a line ENDING people! Leave it in. ) # Minimal starting config add_core_config(env, opts) subprocess.check_call(['nginx', '-vt']) logger.info("generated minimal nginx configuration") # Begin monitoring k8s API for NodePort services cmd = ['kubectl', 'get', 'services', '--all-namespaces', '--watch', '--output', 'json'] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True) logger.info("pid %s: kubectl", proc.pid) with contextlib.ExitStack() as stack: stack.enter_context(proc) stack.callback(os.kill, os.getpid(), signal.SIGTERM) @stack.push def log_exc(exc_cls, *_): if exc_cls and not issubclass(exc_cls, KeyboardInterrupt): logger.exception("") return True if os.fork(): # Parent becomes nginx logger.info("pid %s: nginx", os.getpid()) os.execvp('nginx', ['nginx']) logger.info("pid %s: monitor", os.getpid()) streams_map = collections.defaultdict(list) for uid, stream in filter_services(proc.stdout): if not uid: logging.info("reloading nginx") subprocess.check_call(['nginx', '-s', 'reload']) continue streams = streams_map[uid] if stream: stream.enable(env) streams.append(stream) logging.info("stream added: %s", stream.name) else: for stream in streams: stream.disable() logging.info("stream removed: %s", stream.name) del streams[:] logging.info("service cleared: %s", uid) def filter_services(stream: TextIO): """ Filter Service objects from stream into (unique-ID, StreamConfig) values Parses Kubernetes Service objects from a JSON stream and filters them, yielding (unique-ID, StreamConfig) tuples. See `node_ports` for the filtering information. The yielded unique-ID (uid) values are the Service uids from the Kubernetes API. If StreamConfig value is None (and the uid is not None) all the stream configurations for the service with that UID are cleared. If the uid is None, it indicates that a service has been completely handled, and any configuration changes should be committed. """ external_addrs = set(get_host_addr()) buf = io.StringIO() for line in stream: buf.write(line) if line and line != '}\n': continue buf.seek(0) service = json.load(buf) buf.seek(0) buf.truncate() yield from node_ports(service, external_addrs) yield None, None # Nginx reload if stream.closed: logger.info("kubectl stream has closed") return if not line: logger.warning("empty line from an open stream") def node_ports(service: dict, host_addrs: Set[IPAddress]): """ Yield port information from services relevant to the current node's host Returns a generator that yields (unique-ID, StreamConfig) values for any of the node host's addresses listed in the 'kodo.org.uk/port-proxy' annotation, if a service is eligible (type == NodePort AND 'kodo.org.uk/port-proxy' in .metadata.annotations). See `filter_services` for information on the yielded values. """ metadata = service['metadata'] spec = service['spec'] name = f"{metadata['name']}.{metadata['namespace']}" uid = metadata['uid'] logger.info("got a service: %s (%s)", name, uid) # Clear old streams for this service yield uid, None # TODO: On addressing #20, remove NodePort restriction if spec.get('type') != 'NodePort': logger.debug("rejecting service: not a NodePort") return try: ports = spec['ports'] # TODO: Awaiting #20 # cluster_ip = ipaddress.ip_address(spec['clusterIP']) ext_ip = metadata['annotations']['kodo.org.uk/port-proxy'] except KeyError as exc: logger.debug("rejecting service: missing %s", exc) return logger.debug("kodo.org.uk/port-proxy: %s", ext_ip) exts = set(addr_from_names(n.strip() for n in ext_ip.split(','))) exts.intersection_update(host_addrs) if not exts: logger.debug("rejecting service: no available addresses") return for ext_ip in exts: for port in ports: logger.debug("generating for %s and %s", ext_ip, port['name']) yield uid, StreamConfig( name=f"{name}.{port['name']}", protocol=port['protocol'], external_port=(ext_ip, port['port']), # TODO: Awaiting #20 # internal_port=(cluster_ip, port['port']), internal_port=(ext_ip, port['nodePort']), ) def get_host_addr() -> Iterable[IPAddress]: """ Yield all IP addresses configured on the host """ for iface in netifaces.interfaces(): for af, iface_addr in netifaces.ifaddresses(iface).items(): if af not in {socket.AF_INET, socket.AF_INET6}: continue for addr in iface_addr: try: yield ipaddress.ip_address(addr['addr']) except ValueError: if '%' not in addr['addr']: raise logger.warning("ignoring scoped IPv6 address") def addr_from_names(names: Iterable[str]) -> Iterable[IPAddress]: """ Generate IPAddress objects from a list of addresses & names Returns a generator """ for name in names: try: yield ipaddress.ip_address(name) except ValueError: for af, *_, addrinfo in socket.getaddrinfo(name, None): if af not in {socket.AF_INET, socket.AF_INET6}: continue yield ipaddress.ip_address(addrinfo[0]) if __name__ == '__main__': logging.basicConfig() logger = logging.getLogger('monitor') try: main() except KeyboardInterrupt: logger.info("exit on SIGINT") except: logger.exception("error from monitor process") templates/nginx.conf 0 → 100644 +44 −0 Original line number Diff line number Diff line user www-data; worker_processes auto; daemon off; pid /run/nginx.pid; error_log /dev/stderr {{ 'debug' if verbose >= 2 else 'info' if verbose == 1 else 'notice' if not quiet else 'warn' }}; include /etc/nginx/modules-enabled/*.conf; events { worker_connections 768; } http { server { {% set url = monitor_url -%} {% set port = url.port if url.port else 80 if url.scheme == 'http' else 443 -%} {% set flag = 'default_server' if url.scheme == 'http' else 'ssl' -%} listen {{ url.hostname }}:{{ port }} {{ flag }}; server_name _; {%- if url.scheme == 'https' %} ssl_certificate {{ ssl_certificate }}; ssl_certificate_key {{ ssl_certificate_key or ssl_certificate }}; {% endif -%} location / { return 404 'Nothing here'; } location = {{ url.path or '/' }} { return 200 'OK'; } } } stream { include /etc/nginx/streams/*.conf; } templates/stream.conf 0 → 100644 +5 −0 Original line number Diff line number Diff line server { listen {{ external_port|join(':') }} {{ 'udp' if protocol == 'UDP' else '' }}; proxy_pass {{ internal_port|join(':') }}; proxy_protocol on; } Loading
Dockerfile 0 → 100644 +26 −0 Original line number Diff line number Diff line ARG base_image FROM ${base_image:-debian:buster} LABEL uk.org.kodo.maintainer="Dom Sekotill <dom.sekotill@kodo.org.uk>" ARG upstream_version RUN version=${upstream_version:+=$upstream_version} \ && apt-get update \ && apt-get install -y \ python3 \ python3-jinja2 \ python3-netifaces \ nginx${version} \ && rm -r /etc/nginx/nginx.conf /etc/nginx/sites-* \ && mkdir /etc/nginx/streams \ && ln -sf /dev/stdout /var/log/nginx/access.log \ && ln -sf /dev/stderr /var/log/nginx/error.log \ &&: COPY --from=lachlanevenson/k8s-kubectl:latest \ /usr/local/bin/kubectl /usr/bin/kubectl ADD monitor.py /usr/bin/monitor ADD templates /usr/share/nginx/templates ENTRYPOINT ["/usr/bin/env"] CMD ["/usr/bin/monitor"]
build.sh 0 → 100644 +21 −0 Original line number Diff line number Diff line #!/bin/bash : ${PROXY_BASE_IMAGE:=debian:buster} get_latest() { docker run --rm ${PROXY_BASE_IMAGE} \ sh -c 'apt-get update >/dev/null && apt-cache show nginx' | sed -n '1,/Version:/ s/Version: //p' } get_version() { echo "${CI_COMMIT_SHORT_SHA}" } build() { docker_build \ --build-arg base_image="${PROXY_BASE_IMAGE}" \ --build-arg upstream_version="${UPSTREAM_VERSION}" \ --tag $1 }
monitor.py 0 → 100755 +304 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 import argparse import collections import contextlib import hashlib import io import ipaddress import json import logging import netifaces import os import pathlib import signal import socket import subprocess from urllib import parse from typing import Iterable, Set, TextIO, Union import jinja2 NGINX_BASE = '/etc/nginx' IPAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address] logger = logging.getLogger(__name__) class StreamConfig: """ Manage an Nginx stream configuration """ def __init__(self, name, protocol, external_port, internal_port): self.name = name self.path = None self.vals = dict( protocol=protocol, external_port=external_port, internal_port=internal_port, ) def enable(self, engine, nginx_base=NGINX_BASE): """ Add the stream config """ self.path = pathlib.Path(nginx_base).joinpath(f'streams/{self.name}.conf') template = engine.get_template('stream.conf') with self.path.open('w') as conf: conf.writelines(template.stream(**self.vals)) def disable(self): """ Remove the stream config """ if self.path.exists(): self.path.unlink() def add_core_config(engine, opts, nginx_dir=NGINX_BASE): """ Add a minimal Nginx configuration """ template = engine.get_template('nginx.conf') path = pathlib.Path(nginx_dir).joinpath('nginx.conf') with path.open('w') as conf: conf.writelines(template.stream(**opts.__dict__)) def parse_cmdline(argv=None): parser = argparse.ArgumentParser() parser.add_argument('--monitor-url', type=parse.urlparse, default=parse.urlparse('http://127.0.0.1/healthz'), ) parser.add_argument('--cert', '--certificate', type=pathlib.Path, ) parser.add_argument('--key', type=pathlib.Path, ) mutex = parser.add_mutually_exclusive_group() mutex.add_argument('--verbose', '-v', action='count', default=0, ) mutex.add_argument('--quiet', '-q', action='store_true', default=False, ) opts = parser.parse_args(argv) if opts.monitor_url.scheme == 'https': if not opts.cert: parser.error(f"{opts.monitor_url!r} requires TLS certificates with --cert and --key") elif not opts.cert.exists(): parser.error(f"{opts.cert} does not exist") elif opts.key and not opts.key.exists(): parser.error(f"{opts.key} does not exist") elif opts.cert or opts.key: parser.error("--cert and --key are not valid for {opts.monitor_url!r}") # Set verbosity logging.root.setLevel( logging.DEBUG if opts.verbose >= 1 else logging.INFO if not opts.quiet else logging.WARNING ) return opts def main(argv=None): opts = parse_cmdline(argv) # Template engine env = jinja2.Environment( loader=jinja2.FileSystemLoader('/usr/share/nginx/templates'), keep_trailing_newline=True, # It's a line ENDING people! Leave it in. ) # Minimal starting config add_core_config(env, opts) subprocess.check_call(['nginx', '-vt']) logger.info("generated minimal nginx configuration") # Begin monitoring k8s API for NodePort services cmd = ['kubectl', 'get', 'services', '--all-namespaces', '--watch', '--output', 'json'] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True) logger.info("pid %s: kubectl", proc.pid) with contextlib.ExitStack() as stack: stack.enter_context(proc) stack.callback(os.kill, os.getpid(), signal.SIGTERM) @stack.push def log_exc(exc_cls, *_): if exc_cls and not issubclass(exc_cls, KeyboardInterrupt): logger.exception("") return True if os.fork(): # Parent becomes nginx logger.info("pid %s: nginx", os.getpid()) os.execvp('nginx', ['nginx']) logger.info("pid %s: monitor", os.getpid()) streams_map = collections.defaultdict(list) for uid, stream in filter_services(proc.stdout): if not uid: logging.info("reloading nginx") subprocess.check_call(['nginx', '-s', 'reload']) continue streams = streams_map[uid] if stream: stream.enable(env) streams.append(stream) logging.info("stream added: %s", stream.name) else: for stream in streams: stream.disable() logging.info("stream removed: %s", stream.name) del streams[:] logging.info("service cleared: %s", uid) def filter_services(stream: TextIO): """ Filter Service objects from stream into (unique-ID, StreamConfig) values Parses Kubernetes Service objects from a JSON stream and filters them, yielding (unique-ID, StreamConfig) tuples. See `node_ports` for the filtering information. The yielded unique-ID (uid) values are the Service uids from the Kubernetes API. If StreamConfig value is None (and the uid is not None) all the stream configurations for the service with that UID are cleared. If the uid is None, it indicates that a service has been completely handled, and any configuration changes should be committed. """ external_addrs = set(get_host_addr()) buf = io.StringIO() for line in stream: buf.write(line) if line and line != '}\n': continue buf.seek(0) service = json.load(buf) buf.seek(0) buf.truncate() yield from node_ports(service, external_addrs) yield None, None # Nginx reload if stream.closed: logger.info("kubectl stream has closed") return if not line: logger.warning("empty line from an open stream") def node_ports(service: dict, host_addrs: Set[IPAddress]): """ Yield port information from services relevant to the current node's host Returns a generator that yields (unique-ID, StreamConfig) values for any of the node host's addresses listed in the 'kodo.org.uk/port-proxy' annotation, if a service is eligible (type == NodePort AND 'kodo.org.uk/port-proxy' in .metadata.annotations). See `filter_services` for information on the yielded values. """ metadata = service['metadata'] spec = service['spec'] name = f"{metadata['name']}.{metadata['namespace']}" uid = metadata['uid'] logger.info("got a service: %s (%s)", name, uid) # Clear old streams for this service yield uid, None # TODO: On addressing #20, remove NodePort restriction if spec.get('type') != 'NodePort': logger.debug("rejecting service: not a NodePort") return try: ports = spec['ports'] # TODO: Awaiting #20 # cluster_ip = ipaddress.ip_address(spec['clusterIP']) ext_ip = metadata['annotations']['kodo.org.uk/port-proxy'] except KeyError as exc: logger.debug("rejecting service: missing %s", exc) return logger.debug("kodo.org.uk/port-proxy: %s", ext_ip) exts = set(addr_from_names(n.strip() for n in ext_ip.split(','))) exts.intersection_update(host_addrs) if not exts: logger.debug("rejecting service: no available addresses") return for ext_ip in exts: for port in ports: logger.debug("generating for %s and %s", ext_ip, port['name']) yield uid, StreamConfig( name=f"{name}.{port['name']}", protocol=port['protocol'], external_port=(ext_ip, port['port']), # TODO: Awaiting #20 # internal_port=(cluster_ip, port['port']), internal_port=(ext_ip, port['nodePort']), ) def get_host_addr() -> Iterable[IPAddress]: """ Yield all IP addresses configured on the host """ for iface in netifaces.interfaces(): for af, iface_addr in netifaces.ifaddresses(iface).items(): if af not in {socket.AF_INET, socket.AF_INET6}: continue for addr in iface_addr: try: yield ipaddress.ip_address(addr['addr']) except ValueError: if '%' not in addr['addr']: raise logger.warning("ignoring scoped IPv6 address") def addr_from_names(names: Iterable[str]) -> Iterable[IPAddress]: """ Generate IPAddress objects from a list of addresses & names Returns a generator """ for name in names: try: yield ipaddress.ip_address(name) except ValueError: for af, *_, addrinfo in socket.getaddrinfo(name, None): if af not in {socket.AF_INET, socket.AF_INET6}: continue yield ipaddress.ip_address(addrinfo[0]) if __name__ == '__main__': logging.basicConfig() logger = logging.getLogger('monitor') try: main() except KeyboardInterrupt: logger.info("exit on SIGINT") except: logger.exception("error from monitor process")
templates/nginx.conf 0 → 100644 +44 −0 Original line number Diff line number Diff line user www-data; worker_processes auto; daemon off; pid /run/nginx.pid; error_log /dev/stderr {{ 'debug' if verbose >= 2 else 'info' if verbose == 1 else 'notice' if not quiet else 'warn' }}; include /etc/nginx/modules-enabled/*.conf; events { worker_connections 768; } http { server { {% set url = monitor_url -%} {% set port = url.port if url.port else 80 if url.scheme == 'http' else 443 -%} {% set flag = 'default_server' if url.scheme == 'http' else 'ssl' -%} listen {{ url.hostname }}:{{ port }} {{ flag }}; server_name _; {%- if url.scheme == 'https' %} ssl_certificate {{ ssl_certificate }}; ssl_certificate_key {{ ssl_certificate_key or ssl_certificate }}; {% endif -%} location / { return 404 'Nothing here'; } location = {{ url.path or '/' }} { return 200 'OK'; } } } stream { include /etc/nginx/streams/*.conf; }
templates/stream.conf 0 → 100644 +5 −0 Original line number Diff line number Diff line server { listen {{ external_port|join(':') }} {{ 'udp' if protocol == 'UDP' else '' }}; proxy_pass {{ internal_port|join(':') }}; proxy_protocol on; }