xref: /openbmc/openbmc/poky/meta/lib/patchtest/utils.py (revision 73bd93f1)
1# ex:ts=4:sw=4:sts=4:et
2# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
3#
4# utils: common methods used by the patchtest framework
5#
6# Copyright (C) 2016 Intel Corporation
7#
8# SPDX-License-Identifier: GPL-2.0-only
9#
10
11import os
12import subprocess
13import logging
14import re
15import mailbox
16
17class CmdException(Exception):
18    """ Simple exception class where its attributes are the ones passed when instantiated """
19    def __init__(self, cmd):
20        self._cmd = cmd
21    def __getattr__(self, name):
22        value = None
23        if self._cmd.has_key(name):
24            value = self._cmd[name]
25        return value
26
27def exec_cmd(cmd, cwd, ignore_error=False, input=None, strip=True, updateenv={}):
28    """
29         Input:
30
31            cmd: dict containing the following keys:
32
33                cmd : the command itself as an array of strings
34                ignore_error: if False, no exception is raised
35                strip: indicates if strip is done on the output (stdout and stderr)
36                input: input data to the command (stdin)
37                updateenv: environment variables to be appended to the current
38                process environment variables
39
40            NOTE: keys 'ignore_error' and 'input' are optional; if not included,
41            the defaults are the ones specify in the arguments
42            cwd: directory where commands are executed
43            ignore_error: raise CmdException if command fails to execute and
44            this value is False
45            input: input data (stdin) for the command
46
47         Output: dict containing the following keys:
48
49             cmd: the same as input
50             ignore_error: the same as input
51             strip: the same as input
52             input: the same as input
53             stdout: Standard output after command's execution
54             stderr: Standard error after command's execution
55             returncode: Return code after command's execution
56
57    """
58    cmddefaults = {
59        'cmd':'',
60        'ignore_error':ignore_error,
61        'strip':strip,
62        'input':input,
63        'updateenv':updateenv,
64    }
65
66    # update input values if necessary
67    cmddefaults.update(cmd)
68
69    _cmd = cmddefaults
70
71    if not _cmd['cmd']:
72        raise CmdException({'cmd':None, 'stderr':'no command given'})
73
74    # update the environment
75    env = os.environ
76    env.update(_cmd['updateenv'])
77
78    _command = [e for e in _cmd['cmd']]
79    p = subprocess.Popen(_command,
80                         stdin=subprocess.PIPE,
81                         stdout=subprocess.PIPE,
82                         stderr=subprocess.PIPE,
83                         universal_newlines=True,
84                         cwd=cwd,
85                         env=env)
86
87    # execute the command and strip output
88    (_stdout, _stderr) = p.communicate(_cmd['input'])
89    if _cmd['strip']:
90        _stdout, _stderr = map(str.strip, [_stdout, _stderr])
91
92    # generate the result
93    result = _cmd
94    result.update({'cmd':_command,'stdout':_stdout,'stderr':_stderr,'returncode':p.returncode})
95
96    # launch exception if necessary
97    if not _cmd['ignore_error'] and p.returncode:
98        raise CmdException(result)
99
100    return result
101
102def exec_cmds(cmds, cwd):
103    """ Executes commands
104
105         Input:
106             cmds: Array of commands
107             cwd: directory where commands are executed
108
109         Output: Array of output commands
110    """
111    results = []
112    _cmds = cmds
113
114    for cmd in _cmds:
115        result = exec_cmd(cmd, cwd)
116        results.append(result)
117
118    return results
119
120def logger_create(name):
121    logger = logging.getLogger(name)
122    loggerhandler = logging.StreamHandler()
123    loggerhandler.setFormatter(logging.Formatter("%(message)s"))
124    logger.addHandler(loggerhandler)
125    logger.setLevel(logging.INFO)
126    return logger
127
128def get_subject_prefix(path):
129    prefix = ""
130    mbox = mailbox.mbox(path)
131
132    if len(mbox):
133        subject = mbox[0]['subject']
134        if subject:
135            pattern = re.compile(r"(\[.*\])", re.DOTALL)
136            match = pattern.search(subject)
137            if match:
138                prefix = match.group(1)
139
140    return prefix
141
142def valid_branch(branch):
143    """ Check if branch is valid name """
144    lbranch = branch.lower()
145
146    invalid  = lbranch.startswith('patch') or \
147               lbranch.startswith('rfc') or \
148               lbranch.startswith('resend') or \
149               re.search(r'^v\d+', lbranch) or \
150               re.search(r'^\d+/\d+', lbranch)
151
152    return not invalid
153
154def get_branch(path):
155    """ Get the branch name from mbox """
156    fullprefix = get_subject_prefix(path)
157    branch, branches, valid_branches = None, [], []
158
159    if fullprefix:
160        prefix = fullprefix.strip('[]')
161        branches = [ b.strip() for b in prefix.split(',')]
162        valid_branches = [b for b in branches if valid_branch(b)]
163
164    if len(valid_branches):
165        branch = valid_branches[0]
166
167    return branch
168
169