#!/usr/bin/python # Copyright (c) 2005-2009 Canonical Ltd # # AUTHOR: # Michael Vogt # # This file is part of unattended-upgrades # # unattended-upgrades is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation; either version 2 of the License, or (at # your option) any later version. # # unattended-upgrades is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with unattended-upgrades; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # import apt_inst import apt_pkg import sys import os import string import datetime import ConfigParser from StringIO import StringIO from optparse import OptionParser from subprocess import Popen, PIPE import warnings warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning) import apt import logging import subprocess import gettext from gettext import gettext as _ class UnattendedUpgradesCache(apt.Cache): def __init__(self, rootdir, allowed_origins): apt.Cache.__init__(self, rootdir=rootdir) self.allowed_origins = allowed_origins # ensure we update the candidate versions self.adjust_candidate_versions() def clear(self): apt.Cache.clear(self) # ensure we update the candidate versions self.adjust_candidate_versions() def adjust_candidate_versions(self): """ Adjust candidate versions to match highest allowed origin This adjusts the origin even if the candidate has a higher version """ for pkg in self: # important! this avoids downgrades below if not pkg.is_upgradable: continue # check if we have a version in a allowed origin that is # not the candidate new_cand = None for ver in pkg.versions: if is_allowed_origin(ver, self.allowed_origins): # leave as soon as we have the highest new candidate new_cand = ver break if new_cand and new_cand != pkg.candidate: logging.debug("adjusting candidate version: '%s'" % new_cand) pkg.candidate = new_cand def is_allowed_origin(ver, allowed_origins): if not ver: return False for origin in ver.origins: for allowed in allowed_origins: if origin.origin == allowed[0] and origin.archive == allowed[1]: return True return False def check_changes_for_sanity(cache, allowed_origins, blacklist): if cache._depcache.BrokenCount != 0: return False for pkg in cache: if pkg.markedDelete: logging.debug("pkg '%s' now marked delete" % pkg.name) return False if pkg.markedInstall or pkg.markedUpgrade: if not is_allowed_origin(pkg.candidate, allowed_origins): logging.debug("pkg '%s' not in allowed origin" % pkg.name) return False if pkg.name in blacklist: logging.debug("pkg '%s' blacklisted" % pkg.name) return False if pkg._pkg.SelectedState == apt_pkg.SelStateHold: logging.debug("pkg '%s' is on hold" % pkg.name) return False return True def pkgname_from_deb(debfile): # FIXME: add error checking here try: control = apt_inst.debExtractControl(open(debfile)) sections = apt_pkg.ParseSection(control) return sections["Package"] except (IOError, SystemError), e: logging.error("failed to read deb file '%s' (%s)" % (debfile, e)) # dumb fallback return debfile.split("_")[0] # prefix is *only* needed for the build-in tests def conffile_prompt(destFile, prefix=""): logging.debug("check_conffile_prompt('%s')" % destFile) pkgname = pkgname_from_deb(destFile) status_file = apt_pkg.Config.Find("Dir::State::status") parse = apt_pkg.ParseTagFile(open(status_file,"r")) while parse.Step() == 1: if parse.Section.get("Package") == pkgname: logging.debug("found pkg: %s" % pkgname) if parse.Section.has_key("Conffiles"): conffiles = parse.Section.get("Conffiles") # Conffiles: # /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c for line in string.split(conffiles,"\n"): logging.debug("conffile line: %s", line) l = string.split(string.strip(line)) conf_file = l[0] md5 = l[1] if len(l) > 2: obs = l[2] else: obs = None # ignore if conffile is obsolete or does not exist if obs == "obsolete" or not os.path.exists(prefix+conf_file): continue # ignore state "newconffile" until its clearer if there # might be a dpkg prompt (LP: #936870) if md5 == "newconffile": continue # get conffile value from pkg pkg_conffiles = apt_inst.debExtractControl( open(destFile), "conffiles") if (not pkg_conffiles or not conf_file in pkg_conffiles.split("\n")): logging.debug("'%s' not in package conffiles '%s'" % (conf_file, pkg_conffiles)) continue # test against the installed file current_md5 = apt_pkg.md5sum(open(prefix+conf_file).read()) logging.debug("current md5: %s" % current_md5) # hashes are the same, no conffile prompt if current_md5 == md5: continue # calculate md5sum from the deb (may take a bit) dpkg_cmd = ["dpkg-deb","--fsys-tarfile",destFile] tar_cmd = ["tar","-x","-O", "-f","-", "."+conf_file] md5_cmd = ["md5sum"] dpkg_p = Popen(dpkg_cmd, stdout=PIPE) tar_p = Popen(tar_cmd, stdin=dpkg_p.stdout, stdout=PIPE) md5_p = Popen(md5_cmd, stdin=tar_p.stdout, stdout=PIPE) pkg_md5sum = md5_p.communicate()[0].split()[0] logging.debug("pkg_md5sum: %s" % pkg_md5sum) # the md5sum in the deb is unchanged, this will not # trigger a conffile prompt if pkg_md5sum == md5: continue # if we made it to this point: # current_md5 != pkg_md5sum != md5 # and that will trigger a conffile prompt, we can # stop processing at this point and just return True return True return False def dpkg_conffile_prompt(): if not apt_pkg.Config.has_key("DPkg::Options"): return True options = apt_pkg.Config.ValueList("DPkg::Options") for option in map(string.strip, options): if (option == "--force-confold" or option == "--force-confnew"): return False return True def rewind_cache(cache, pkgs_to_upgrade): " set the cache back to the state with packages_to_upgrade " cache.clear() for pkg2 in pkgs_to_upgrade: pkg2.markUpgrade() def host(): return os.uname()[1] # *sigh* textwrap is nice, but it breaks "linux-image" into two # seperate lines def wrap(t, width=70, subsequent_indent=""): out = "" for s in t.split(): if (len(out)-out.rfind("\n")) + len(s) > width: out += "\n" + subsequent_indent out += s + " " return out def setup_apt_listchanges(): " deal with apt-listchanges " conf = "/etc/apt/listchanges.conf" if os.path.exists(conf): # check if mail is used by apt-listchanges cf = ConfigParser.ConfigParser() cf.read(conf) if cf.has_section("apt") and cf.has_option("apt","frontend"): frontend = cf.get("apt","frontend") if frontend == "mail" and os.path.exists("/usr/sbin/sendmail"): # mail frontend and sendmail, we are fine logging.debug("apt-listchanges is set to mail frontend, ignoring") return # setup env (to play it safe) and return os.putenv("APT_LISTCHANGES_FRONTEND","none"); def send_summary_mail(pkgs, res, pkgs_kept_back, mem_log, logfile_dpkg): " send mail (if configured in Unattended-Upgrades::Mail) " email = apt_pkg.Config.Find("Unattended-Upgrade::Mail", "") if not email: return if not os.path.exists("/usr/bin/mail"): logging.error(_("No '/usr/bin/mail', can not send mail. " "You probably want to install the 'mailx' package.")) return logging.debug("Sending mail with '%s' to '%s'" % (logfile_dpkg, email)) mail = subprocess.Popen(["/usr/bin/mail", "-s", _("unattended-upgrades result " "for '%s'") % host(), email], stdin=subprocess.PIPE) s = _("Unattended upgrade returned: %s\n\n") % res s += _("Packages that are upgraded:\n") s += " " + wrap(pkgs, 70, " ") s += "\n" if pkgs_kept_back: s += _("Packages with upgradable origin but kept back:\n") s += " " + wrap(" ".join(pkgs_kept_back), 70, " ") s += "\n" s += "\n" s += _("Package installation log:")+"\n" s += open(logfile_dpkg).read() s += "\n\n" s += _("Unattended-upgrades log:\n") s += mem_log.getvalue() mail.stdin.write(s) mail.stdin.close() ret = mail.wait() logging.debug("mail returned: %s" % ret) def main(): # init the options parser = OptionParser() parser.add_option("-d", "--debug", action="store_true", dest="debug", default=False, help=_("print debug messages")) parser.add_option("", "--dry-run", action="store_true", default=False, help=_("Simulation, download but do not install")) (options, args) = parser.parse_args() # setup logging logger = logging.getLogger() mem_log = StringIO() if options.debug: logger.setLevel(logging.DEBUG) stderr_handler = logging.StreamHandler() logger.addHandler(stderr_handler) if apt_pkg.Config.Find("Unattended-Upgrade::Mail", ""): mem_log_handler = logging.StreamHandler(mem_log) logger.addHandler(mem_log_handler) # format (origin, archive), e.g. ("Ubuntu","dapper-security") allowed_origins = map(string.split, apt_pkg.Config.ValueList("Unattended-Upgrade::Allowed-Origins")) # pkgs that are (for some reason) not save to install blacklisted_pkgs = apt_pkg.Config.ValueList("Unattended-Upgrade::Package-Blacklist") logging.info(_("Initial blacklisted packages: %s"), " ".join(blacklisted_pkgs)) logging.info(_("Starting unattended upgrades script")) # display available origin logging.info(_("Allowed origins are: %s") % map(str,allowed_origins)) # check and get lock try: apt_pkg.PkgSystemLock() except SystemError, e: logging.error(_("Lock could not be acquired (another package " "manager running?)")) print _("Cache lock can not be acquired, exiting") sys.exit(1) # get a cache rootdir = None cache = UnattendedUpgradesCache(rootdir=rootdir, allowed_origins=allowed_origins) if cache._depcache.BrokenCount > 0: print _("Cache has broken packages, exiting") logging.error(_("Cache has broken packages, exiting")) sys.exit(1) # speed things up with latest apt actiongroup = apt_pkg.GetPkgActionGroup(cache._depcache) # find out about the packages that are upgradable (in a allowed_origin) pkgs_to_upgrade = [] pkgs_kept_back = [] pkgs_auto_removable = set([pkg.name for pkg in cache if pkg.isAutoRemovable]) # now do the actual upgrade for pkg in cache: if options.debug and pkg.isUpgradable: logging.debug("Checking: %s (%s)" % (pkg.name, map(str, pkg.candidate.origins))) if (pkg.isUpgradable and is_allowed_origin(pkg.candidate, allowed_origins)): try: pkg.markUpgrade() if check_changes_for_sanity(cache, allowed_origins, blacklisted_pkgs): pkgs_to_upgrade.append(pkg) else: logging.debug("sanity check failed") rewind_cache(cache, pkgs_to_upgrade) pkgs_kept_back.append(pkg.name) except SystemError, e: # can't upgrade logging.warning(_("package '%s' upgradable but fails to be marked for upgrade (%s)") % (pkg.name, e)) rewind_cache(cache, pkgs_to_upgrade) pkgs_kept_back.append(pkg.name) pkgs = "\n".join([pkg.name for pkg in pkgs_to_upgrade]) logging.debug("pkgs that look like they should be upgraded: %s" % pkgs) # download what looks good if options.debug: fetcher = apt_pkg.GetAcquire(apt.progress.TextFetchProgress()) else: fetcher = apt_pkg.GetAcquire() list = apt_pkg.GetPkgSourceList() list.ReadMainList() recs = cache._records pm = apt_pkg.GetPackageManager(cache._depcache) try: pm.GetArchives(fetcher,list,recs) except SystemError, e: logging.error(_("GetArchives() failed: '%s'") % e) res = fetcher.Run() if dpkg_conffile_prompt(): # now check the downloaded debs for conffile conflicts and build # a blacklist for item in fetcher.Items: logging.debug("%s" % item) if item.Status == item.StatError: print _("An error ocured: '%s'") % item.ErrorText logging.error(_("An error ocured: '%s'") % item.ErrorText) if item.Complete == False: print _("The URI '%s' failed to download, aborting") % item.desc_uri logging.error(_("The URI '%s' failed to download, aborting") % item.desc_uri) sys.exit(1) if not os.path.exists(item.DestFile): print _("Download finished, but file '%s' not there?!?" % item.DestFile) logging.error("Download finished, but file '%s' not there?!?" % item.DestFile) sys.exit(1) if item.IsTrusted == False: blacklisted_pkgs.append(pkgname_from_deb(item.DestFile)) if conffile_prompt(item.DestFile): # FIXME: skip package (means to re-run the whole marking again # and making sure that the package will not be pulled in by # some other package again! print _("Package '%s' has conffile prompt and needs to be upgraded manually") % pkgname_from_deb(item.DestFile) logging.warning(_("Package '%s' has conffile prompt and needs to be upgraded manually") % pkgname_from_deb(item.DestFile)) blacklisted_pkgs.append(pkgname_from_deb(item.DestFile)) pkgs_kept_back.append(pkgname_from_deb(item.DestFile)) # redo the selection about the packages to upgrade based on the new # blacklist logging.debug("blacklist: %s" % blacklisted_pkgs) # find out about the packages that are upgradable (in a allowed_origin) if len(blacklisted_pkgs) > 0: cache.clear() old_pkgs_to_upgrade = pkgs_to_upgrade[:] pkgs_to_upgrade = [] for pkg in old_pkgs_to_upgrade: logging.debug("Checking (blacklist): %s" % (pkg.name)) pkg.markUpgrade() if check_changes_for_sanity(cache, allowed_origins, blacklisted_pkgs): pkgs_to_upgrade.append(pkg) else: if not (pkg.name in pkgs_kept_back): pkgs_kept_back.append(pkg.name) logging.info(_("package '%s' not upgraded") % pkg.name) cache.clear() for pkg2 in pkgs_to_upgrade: pkg2.markUpgrade() else: logging.debug("dpkg is configured not to cause conffile prompts") # do auto-remove if apt_pkg.Config.FindB("Unattended-Upgrade::Remove-Unused-Dependencies", False): now_auto_removable = set([pkg.name for pkg in cache if pkg.isAutoRemovable]) for pkgname in now_auto_removable-pkgs_auto_removable: logging.debug("marking %s for remove" % pkgname) cache[pkgname].markDelete() logging.info(_("Packages that are auto removed: '%s'") % " ".join(now_auto_removable-pkgs_auto_removable)) logging.debug("InstCount=%i DelCount=%i BrokenCout=%i" % (cache._depcache.InstCount, cache._depcache.DelCount, cache._depcache.BrokenCount)) # exit if there is nothing to do and nothing to report if (len(pkgs_to_upgrade) == 0) and (len(pkgs_kept_back) == 0): logging.info(_("No packages found that can be upgraded unattended")) sys.exit(0) # check if we are in dry-run mode if options.dry_run: logging.info("Option --dry-run given, *not* performing real actions") apt_pkg.Config.Set("Debug::pkgDPkgPM","1") # do the install based on the new list of pkgs pkgs = " ".join([pkg.name for pkg in pkgs_to_upgrade]) logging.info(_("Packages that are upgraded: %s" % pkgs)) # set debconf to NON_INTERACTIVE, redirect output os.putenv("DEBIAN_FRONTEND","noninteractive"); setup_apt_listchanges() # redirect to log REDIRECT_INPUT = os.devnull fd = os.open(REDIRECT_INPUT, os.O_RDWR) os.dup2(fd,0) now = datetime.datetime.now() logfile_dpkg = logdir+'unattended-upgrades-dpkg_%s.log' % now.isoformat('_') logging.info(_("Writing dpkg log to '%s'") % logfile_dpkg) fd = os.open(logfile_dpkg, os.O_RDWR|os.O_CREAT, 0644) old_stdout = os.dup(1) old_stderr = os.dup(2) os.dup2(fd,1) os.dup2(fd,2) # create a new package-manager. the blacklist may have changed # the markings in the depcache pm = apt_pkg.GetPackageManager(cache._depcache) if not pm.GetArchives(fetcher,list,recs): logging.error(_("pm.GetArchives() failed")) # run the fetcher again (otherwise local file:// # URIs are unhappy (see LP: #56832) res = fetcher.Run() # unlock the cache try: apt_pkg.PkgSystemUnLock() except SystemError, e: pass # lock for the shutdown check - its fine if the system # is shutdown while downloading but not so much while installing apt_pkg.GetLock("/var/run/unattended-upgrades.lock") # now do the actual install error = None try: res = pm.DoInstall() except SystemError,e: error = e res = pm.ResultFailed finally: os.dup2(old_stdout, 1) os.dup2(old_stderr, 2) if res == pm.ResultFailed: logging.error(_("Installing the upgrades failed!")) logging.error(_("error message: '%s'") % e) logging.error(_("dpkg returned a error! See '%s' for details") % logfile_dpkg) else: logging.info(_("All upgrades installed")) # send a mail (if needed) pkg_install_success = (res != pm.ResultFailed) send_summary_mail(pkgs, pkg_install_success, pkgs_kept_back, mem_log, logfile_dpkg) # auto-reboot (if required and the config for this is set if (apt_pkg.Config.FindB("Unattended-Upgrade::Automatic-Reboot", False) and os.path.exists("/var/run/reboot-required")): logging.warning("Found /var/run/reboot-required, rebooting") subprocess.call(["/sbin/reboot"]) if __name__ == "__main__": localesApp="unattended-upgrades" localesDir="/usr/share/locale" gettext.bindtextdomain(localesApp, localesDir) gettext.textdomain(localesApp) if os.getuid() != 0: print _("You need to be root to run this application") sys.exit(1) if not os.path.exists("/var/log/unattended-upgrades"): os.makedirs("/var/log/unattended-upgrades") # init the logging logdir = apt_pkg.Config.FindDir("APT::UnattendedUpgrades::LogDir", "/var/log/unattended-upgrades/") logfile = logdir+apt_pkg.Config.Find("APT::UnattendedUpgrades::LogFile", "unattended-upgrades.log") logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', filename=logfile) # run the main code main()