1# Copyright (c) 2015 Stephen Warren
2# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
3#
4# SPDX-License-Identifier: GPL-2.0
5
6# Common logic to interact with U-Boot via the console. This class provides
7# the interface that tests use to execute U-Boot shell commands and wait for
8# their results. Sub-classes exist to perform board-type-specific setup
9# operations, such as spawning a sub-process for Sandbox, or attaching to the
10# serial console of real hardware.
11
12import multiplexed_log
13import os
14import pytest
15import re
16import sys
17import u_boot_spawn
18
19# Regexes for text we expect U-Boot to send to the console.
20pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
21pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))')
22pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
23pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
24pattern_error_notification = re.compile('## Error: ')
25pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
26
27PAT_ID = 0
28PAT_RE = 1
29
30bad_pattern_defs = (
31    ('spl_signon', pattern_u_boot_spl_signon),
32    ('main_signon', pattern_u_boot_main_signon),
33    ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
34    ('unknown_command', pattern_unknown_command),
35    ('error_notification', pattern_error_notification),
36    ('error_please_reset', pattern_error_please_reset),
37)
38
39class ConsoleDisableCheck(object):
40    """Context manager (for Python's with statement) that temporarily disables
41    the specified console output error check. This is useful when deliberately
42    executing a command that is known to trigger one of the error checks, in
43    order to test that the error condition is actually raised. This class is
44    used internally by ConsoleBase::disable_check(); it is not intended for
45    direct usage."""
46
47    def __init__(self, console, check_type):
48        self.console = console
49        self.check_type = check_type
50
51    def __enter__(self):
52        self.console.disable_check_count[self.check_type] += 1
53        self.console.eval_bad_patterns()
54
55    def __exit__(self, extype, value, traceback):
56        self.console.disable_check_count[self.check_type] -= 1
57        self.console.eval_bad_patterns()
58
59class ConsoleSetupTimeout(object):
60    """Context manager (for Python's with statement) that temporarily sets up
61    timeout for specific command. This is useful when execution time is greater
62    then default 30s."""
63
64    def __init__(self, console, timeout):
65        self.p = console.p
66        self.orig_timeout = self.p.timeout
67        self.p.timeout = timeout
68
69    def __enter__(self):
70        return self
71
72    def __exit__(self, extype, value, traceback):
73        self.p.timeout = self.orig_timeout
74
75class ConsoleBase(object):
76    """The interface through which test functions interact with the U-Boot
77    console. This primarily involves executing shell commands, capturing their
78    results, and checking for common error conditions. Some common utilities
79    are also provided too."""
80
81    def __init__(self, log, config, max_fifo_fill):
82        """Initialize a U-Boot console connection.
83
84        Can only usefully be called by sub-classes.
85
86        Args:
87            log: A mulptiplex_log.Logfile object, to which the U-Boot output
88                will be logged.
89            config: A configuration data structure, as built by conftest.py.
90            max_fifo_fill: The maximum number of characters to send to U-Boot
91                command-line before waiting for U-Boot to echo the characters
92                back. For UART-based HW without HW flow control, this value
93                should be set less than the UART RX FIFO size to avoid
94                overflow, assuming that U-Boot can't keep up with full-rate
95                traffic at the baud rate.
96
97        Returns:
98            Nothing.
99        """
100
101        self.log = log
102        self.config = config
103        self.max_fifo_fill = max_fifo_fill
104
105        self.logstream = self.log.get_stream('console', sys.stdout)
106
107        # Array slice removes leading/trailing quotes
108        self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
109        self.prompt_escaped = re.escape(self.prompt)
110        self.p = None
111        self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
112        self.eval_bad_patterns()
113
114        self.at_prompt = False
115        self.at_prompt_logevt = None
116
117    def eval_bad_patterns(self):
118        self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
119            if self.disable_check_count[pat[PAT_ID]] == 0]
120        self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
121            if self.disable_check_count[pat[PAT_ID]] == 0]
122
123    def close(self):
124        """Terminate the connection to the U-Boot console.
125
126        This function is only useful once all interaction with U-Boot is
127        complete. Once this function is called, data cannot be sent to or
128        received from U-Boot.
129
130        Args:
131            None.
132
133        Returns:
134            Nothing.
135        """
136
137        if self.p:
138            self.p.close()
139        self.logstream.close()
140
141    def run_command(self, cmd, wait_for_echo=True, send_nl=True,
142            wait_for_prompt=True):
143        """Execute a command via the U-Boot console.
144
145        The command is always sent to U-Boot.
146
147        U-Boot echoes any command back to its output, and this function
148        typically waits for that to occur. The wait can be disabled by setting
149        wait_for_echo=False, which is useful e.g. when sending CTRL-C to
150        interrupt a long-running command such as "ums".
151
152        Command execution is typically triggered by sending a newline
153        character. This can be disabled by setting send_nl=False, which is
154        also useful when sending CTRL-C.
155
156        This function typically waits for the command to finish executing, and
157        returns the console output that it generated. This can be disabled by
158        setting wait_for_prompt=False, which is useful when invoking a long-
159        running command such as "ums".
160
161        Args:
162            cmd: The command to send.
163            wait_for_each: Boolean indicating whether to wait for U-Boot to
164                echo the command text back to its output.
165            send_nl: Boolean indicating whether to send a newline character
166                after the command string.
167            wait_for_prompt: Boolean indicating whether to wait for the
168                command prompt to be sent by U-Boot. This typically occurs
169                immediately after the command has been executed.
170
171        Returns:
172            If wait_for_prompt == False:
173                Nothing.
174            Else:
175                The output from U-Boot during command execution. In other
176                words, the text U-Boot emitted between the point it echod the
177                command string and emitted the subsequent command prompts.
178        """
179
180        if self.at_prompt and \
181                self.at_prompt_logevt != self.logstream.logfile.cur_evt:
182            self.logstream.write(self.prompt, implicit=True)
183
184        try:
185            self.at_prompt = False
186            if send_nl:
187                cmd += '\n'
188            while cmd:
189                # Limit max outstanding data, so UART FIFOs don't overflow
190                chunk = cmd[:self.max_fifo_fill]
191                cmd = cmd[self.max_fifo_fill:]
192                self.p.send(chunk)
193                if not wait_for_echo:
194                    continue
195                chunk = re.escape(chunk)
196                chunk = chunk.replace('\\\n', '[\r\n]')
197                m = self.p.expect([chunk] + self.bad_patterns)
198                if m != 0:
199                    self.at_prompt = False
200                    raise Exception('Bad pattern found on console: ' +
201                                    self.bad_pattern_ids[m - 1])
202            if not wait_for_prompt:
203                return
204            m = self.p.expect([self.prompt_escaped] + self.bad_patterns)
205            if m != 0:
206                self.at_prompt = False
207                raise Exception('Bad pattern found on console: ' +
208                                self.bad_pattern_ids[m - 1])
209            self.at_prompt = True
210            self.at_prompt_logevt = self.logstream.logfile.cur_evt
211            # Only strip \r\n; space/TAB might be significant if testing
212            # indentation.
213            return self.p.before.strip('\r\n')
214        except Exception as ex:
215            self.log.error(str(ex))
216            self.cleanup_spawn()
217            raise
218
219    def ctrlc(self):
220        """Send a CTRL-C character to U-Boot.
221
222        This is useful in order to stop execution of long-running synchronous
223        commands such as "ums".
224
225        Args:
226            None.
227
228        Returns:
229            Nothing.
230        """
231
232        self.log.action('Sending Ctrl-C')
233        self.run_command(chr(3), wait_for_echo=False, send_nl=False)
234
235    def wait_for(self, text):
236        """Wait for a pattern to be emitted by U-Boot.
237
238        This is useful when a long-running command such as "dfu" is executing,
239        and it periodically emits some text that should show up at a specific
240        location in the log file.
241
242        Args:
243            text: The text to wait for; either a string (containing raw text,
244                not a regular expression) or an re object.
245
246        Returns:
247            Nothing.
248        """
249
250        if type(text) == type(''):
251            text = re.escape(text)
252        m = self.p.expect([text] + self.bad_patterns)
253        if m != 0:
254            raise Exception('Bad pattern found on console: ' +
255                            self.bad_pattern_ids[m - 1])
256
257    def drain_console(self):
258        """Read from and log the U-Boot console for a short time.
259
260        U-Boot's console output is only logged when the test code actively
261        waits for U-Boot to emit specific data. There are cases where tests
262        can fail without doing this. For example, if a test asks U-Boot to
263        enable USB device mode, then polls until a host-side device node
264        exists. In such a case, it is useful to log U-Boot's console output
265        in case U-Boot printed clues as to why the host-side even did not
266        occur. This function will do that.
267
268        Args:
269            None.
270
271        Returns:
272            Nothing.
273        """
274
275        # If we are already not connected to U-Boot, there's nothing to drain.
276        # This should only happen when a previous call to run_command() or
277        # wait_for() failed (and hence the output has already been logged), or
278        # the system is shutting down.
279        if not self.p:
280            return
281
282        orig_timeout = self.p.timeout
283        try:
284            # Drain the log for a relatively short time.
285            self.p.timeout = 1000
286            # Wait for something U-Boot will likely never send. This will
287            # cause the console output to be read and logged.
288            self.p.expect(['This should never match U-Boot output'])
289        except u_boot_spawn.Timeout:
290            pass
291        finally:
292            self.p.timeout = orig_timeout
293
294    def ensure_spawned(self):
295        """Ensure a connection to a correctly running U-Boot instance.
296
297        This may require spawning a new Sandbox process or resetting target
298        hardware, as defined by the implementation sub-class.
299
300        This is an internal function and should not be called directly.
301
302        Args:
303            None.
304
305        Returns:
306            Nothing.
307        """
308
309        if self.p:
310            return
311        try:
312            self.log.start_section('Starting U-Boot')
313            self.at_prompt = False
314            self.p = self.get_spawn()
315            # Real targets can take a long time to scroll large amounts of
316            # text if LCD is enabled. This value may need tweaking in the
317            # future, possibly per-test to be optimal. This works for 'help'
318            # on board 'seaboard'.
319            if not self.config.gdbserver:
320                self.p.timeout = 30000
321            self.p.logfile_read = self.logstream
322            bcfg = self.config.buildconfig
323            config_spl = bcfg.get('config_spl', 'n') == 'y'
324            config_spl_serial_support = bcfg.get('config_spl_serial_support',
325                                                 'n') == 'y'
326            env_spl_skipped = self.config.env.get('env__spl_skipped',
327                                                  False)
328            if config_spl and config_spl_serial_support and not env_spl_skipped:
329                m = self.p.expect([pattern_u_boot_spl_signon] +
330                                  self.bad_patterns)
331                if m != 0:
332                    raise Exception('Bad pattern found on console: ' +
333                                    self.bad_pattern_ids[m - 1])
334            m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns)
335            if m != 0:
336                raise Exception('Bad pattern found on console: ' +
337                                self.bad_pattern_ids[m - 1])
338            self.u_boot_version_string = self.p.after
339            while True:
340                m = self.p.expect([self.prompt_escaped,
341                    pattern_stop_autoboot_prompt] + self.bad_patterns)
342                if m == 0:
343                    break
344                if m == 1:
345                    self.p.send(' ')
346                    continue
347                raise Exception('Bad pattern found on console: ' +
348                                self.bad_pattern_ids[m - 2])
349            self.at_prompt = True
350            self.at_prompt_logevt = self.logstream.logfile.cur_evt
351        except Exception as ex:
352            self.log.error(str(ex))
353            self.cleanup_spawn()
354            raise
355        finally:
356            self.log.end_section('Starting U-Boot')
357
358    def cleanup_spawn(self):
359        """Shut down all interaction with the U-Boot instance.
360
361        This is used when an error is detected prior to re-establishing a
362        connection with a fresh U-Boot instance.
363
364        This is an internal function and should not be called directly.
365
366        Args:
367            None.
368
369        Returns:
370            Nothing.
371        """
372
373        try:
374            if self.p:
375                self.p.close()
376        except:
377            pass
378        self.p = None
379
380    def validate_version_string_in_text(self, text):
381        """Assert that a command's output includes the U-Boot signon message.
382
383        This is primarily useful for validating the "version" command without
384        duplicating the signon text regex in a test function.
385
386        Args:
387            text: The command output text to check.
388
389        Returns:
390            Nothing. An exception is raised if the validation fails.
391        """
392
393        assert(self.u_boot_version_string in text)
394
395    def disable_check(self, check_type):
396        """Temporarily disable an error check of U-Boot's output.
397
398        Create a new context manager (for use with the "with" statement) which
399        temporarily disables a particular console output error check.
400
401        Args:
402            check_type: The type of error-check to disable. Valid values may
403            be found in self.disable_check_count above.
404
405        Returns:
406            A context manager object.
407        """
408
409        return ConsoleDisableCheck(self, check_type)
410
411    def temporary_timeout(self, timeout):
412        """Temporarily set up different timeout for commands.
413
414        Create a new context manager (for use with the "with" statement) which
415        temporarily change timeout.
416
417        Args:
418            timeout: Time in milliseconds.
419
420        Returns:
421            A context manager object.
422        """
423
424        return ConsoleSetupTimeout(self, timeout)
425