Source code for plainbox.impl.secure.launcher1

# This file is part of Checkbox.
#
# Copyright 2013 Canonical Ltd.
# Written by:
#   Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.

#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.

"""
:mod:`plainbox.impl.secure.launcher1` -- plainbox-trusted-launcher-1
====================================================================
"""

import argparse
import copy
import logging
import os
import subprocess

from plainbox.i18n import gettext as _
from plainbox.impl.job import JobDefinition
from plainbox.impl.resource import Resource
from plainbox.impl.unit.template import TemplateUnit
from plainbox.impl.secure.origin import JobOutputTextSource
from plainbox.impl.secure.providers.v1 import all_providers
from plainbox.impl.secure.rfc822 import load_rfc822_records, RFC822SyntaxError
from plainbox.impl.session import SessionManager


[docs]class TrustedLauncher: """ Trusted Launcher for v1 jobs. """ def __init__(self): """ Initialize a new instance of the trusted launcher """ self._job_list = []
[docs] def add_job_list(self, job_list): """ Add jobs to the trusted launcher """ self._job_list.extend(job_list)
[docs] def find_job(self, checksum): for job in self._job_list: if job.checksum == checksum: return job else: raise LookupError( _("Cannot find job with checksum {}").format(checksum))
[docs] def modify_execution_environment(self, target_env): """ Modify the job execution environment with a new set of values. It's mandatory to do this way to keep variables automatically set by pkexec(1) when the org.freedesktop.policykit.exec.allow_gui annotation is set. It will allow the trusted launcher to run X11 applications as another user since the $DISPLAY and $XAUTHORITY environment variables will be retained. """ ptl_env = dict(os.environ) if target_env: ptl_env.update(target_env) return ptl_env
[docs] def run_shell_from_job(self, checksum, env): """ Run a job with the given checksum. :param checksum: The checksum of the job to execute. :param env: Environment to execute the job in. :returns: The return code of the command :raises LookupError: If the checksum does not match any known job """ job = self.find_job(checksum) cmd = [job.shell, '-c', job.command] return subprocess.call(cmd, env=self.modify_execution_environment(env))
[docs] def run_generator_job(self, checksum, env): """ Run a job with and process the stdout to get a job definition. :param checksum: The checksum of the job to execute :param env: Environment to execute the job in. :returns: A list of job definitions that were processed from the output. :raises LookupError: If the checksum does not match any known job """ job = self.find_job(checksum) cmd = [job.shell, '-c', job.command] output = subprocess.check_output( cmd, universal_newlines=True, env=self.modify_execution_environment(env)) job_list = [] source = JobOutputTextSource(job) try: record_list = load_rfc822_records(output, source=source) except RFC822SyntaxError as exc: logging.error( _("Syntax error in record generated from %s: %s"), job, exc) else: if job.plugin == 'resource': resource_list = [] for record in record_list: resource = Resource(record.data) resource_list.append(resource) for plugin in all_providers.get_all_plugins(): for u in plugin.plugin_object.unit_list: if ( isinstance(u, TemplateUnit) and u.resource_id == job.id ): logging.info(_("Instantiating unit: %s"), u) for new_unit in u.instantiate_all(resource_list): job_list.append(new_unit) return job_list
[docs]class UpdateAction(argparse.Action): """ Argparse action that builds up a dictionary. This action is similar to the built-in append action but it constructs a dictionary instead of a list. """ def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None): if nargs == 0: raise ValueError('nargs for append actions must be > 0; if arg ' 'strings are not supplying the value to append, ' 'the append const action may be more appropriate') if const is not None and nargs != argparse.OPTIONAL: raise ValueError( 'nargs must be {!r} to supply const'.format(argparse.OPTIONAL)) super().__init__( option_strings=option_strings, dest=dest, nargs=nargs, const=const, default=default, type=type, choices=choices, required=required, help=help, metavar=metavar) def __call__(self, parser, namespace, values, option_string=None): """ Internal method of argparse.Action This method is invoked to "apply" the action after seeing all the values for a given argument. Please refer to argparse source code for information on how it is used. """ items = copy.copy(argparse._ensure_value(namespace, self.dest, {})) for value in values: try: k, v = value.split('=', 1) except ValueError: raise argparse.ArgumentError(self, "expected NAME=VALUE") else: items[k] = v setattr(namespace, self.dest, items)
[docs]def get_parser_for_sphinx(): parser = argparse.ArgumentParser( prog="plainbox-trusted-launcher-1", description=_("Security elevation mechanism for plainbox")) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( '-w', '--warmup', action='store_true', # TRANSLATORS: don't translate pkexec(1) help=_('return immediately, only useful when used with pkexec(1)')) group.add_argument( '-t', '--target', metavar=_('CHECKSUM'), help=_('run a job with this checksum')) group = parser.add_argument_group(_("target job specification")) group.add_argument( '-T', '--target-environment', metavar=_('NAME=VALUE'), dest='target_env', nargs='+', action=UpdateAction, help=_('environment passed to the target job')) group = parser.add_argument_group(title=_("generator job specification")) group.add_argument( '-g', '--generator', metavar=_('CHECKSUM'), # TRANSLATORS: don't translate 'resource' in the sentence below. It # denotes a special type of job. help=_('also run a job with this checksum (assuming it is a resource' ' job)')) group.add_argument( '-G', '--generator-environment', dest='generator_env', nargs='+', metavar=_('NAME=VALUE'), action=UpdateAction, help=_('environment passed to the generator job')) return parser
[docs]def main(argv=None): """ Entry point for the plainbox-trusted-launcher-1 :param argv: Command line arguments to parse. If None (default) then sys.argv is used instead. :returns: The return code of the job that was selected with the --target argument or zero if the --warmup argument was specified. :raises: SystemExit if --taget or --generator point to unknown jobs. The trusted launcher is a sudo-like program, that can grant unprivileged users permission to run something as root, that is restricted to executing shell snippets embedded inside job definitions offered by v1 plainbox providers. As a security measure the trusted launcher only considers job providers listed in the system-wide directory since one needs to be root to add additional definitions there anyway. Unlike the rest of plainbox, the trusted launcher does not produce job results, instead it just literally executes the shell snippet and returns stdout/stderr unaffected to the invoking process. The exception to this rule is the way --via argument is handled, where the trusted launcher needs to capture stdout to interpret that as job definitions. Unlike sudo, the trusted launcher is not a setuid program and cannot grant root access in itself. Instead it relies on a policykit and specifically on pkexec(1) alongside with an appropriate policy file, to grant users a way to run trusted-launcher as root (or another user). """ parser = get_parser_for_sphinx() ns = parser.parse_args(argv) # Just quit if warming up if ns.warmup: return 0 launcher = TrustedLauncher() # Siphon all jobs from all secure providers otherwise all_providers.load() for plugin in all_providers.get_all_plugins(): launcher.add_job_list(plugin.plugin_object.job_list) # Run the generator job and feed the result back to the launcher if ns.generator: try: generated_job_list = launcher.run_generator_job( ns.generator, ns.generator_env) launcher.add_job_list(generated_job_list) except LookupError as exc: raise SystemExit(str(exc)) # Add siblings jobs with SessionManager.get_throwaway_manager( all_providers.get_all_plugin_objects()) as m: launcher.add_job_list(m.state.job_list) # Run the target job and return the result code try: return launcher.run_shell_from_job(ns.target, ns.target_env) except LookupError as exc: raise SystemExit(str(exc))
if __name__ == "__main__": main()
comments powered by Disqus