1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2015 Stephen Warren
3# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
4
5# Common logic to interact with U-Boot via the console. This class provides
6# the interface that tests use to execute U-Boot shell commands and wait for
7# their results. Sub-classes exist to perform board-type-specific setup
8# operations, such as spawning a sub-process for Sandbox, or attaching to the
9# serial console of real hardware.
10
11import multiplexed_log
12import os
13import pytest
14import re
15import sys
16import u_boot_spawn
17
18# Regexes for text we expect U-Boot to send to the console.
19pattern_u_boot_spl_signon = re.compile('(U-Boot spl \\d{4}\\.\\d{2}[^\r\n]*\\))')
20pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))')
21pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
22pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
23pattern_error_notification = re.compile('## Error: ')
24pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
25
26PAT_ID = 0
27PAT_RE = 1
28
29bad_pattern_defs = (
30    ('spl_signon', pattern_u_boot_spl_signon),
31    ('main_signon', pattern_u_boot_main_signon),
32    ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
33    ('unknown_command', pattern_unknown_command),
34    ('error_notification', pattern_error_notification),
35    ('error_please_reset', pattern_error_please_reset),
36)
37
38class ConsoleDisableCheck(object):
39    """Context manager (for Python's with statement) that temporarily disables
40    the specified console output error check. This is useful when deliberately
41    executing a command that is known to trigger one of the error checks, in
42    order to test that the error condition is actually raised. This class is
43    used internally by ConsoleBase::disable_check(); it is not intended for
44    direct usage."""
45
46    def __init__(self, console, check_type):
47        self.console = console
48        self.check_type = check_type
49
50    def __enter__(self):
51        self.console.disable_check_count[self.check_type] += 1
52        self.console.eval_bad_patterns()
53
54    def __exit__(self, extype, value, traceback):
55        self.console.disable_check_count[self.check_type] -= 1
56        self.console.eval_bad_patterns()
57
58class ConsoleSetupTimeout(object):
59    """Context manager (for Python's with statement) that temporarily sets up
60    timeout for specific command. This is useful when execution time is greater
61    then default 30s."""
62
63    def __init__(self, console, timeout):
64        self.p = console.p
65        self.orig_timeout = self.p.timeout
66        self.p.timeout = timeout
67
68    def __enter__(self):
69        return self
70
71    def __exit__(self, extype, value, traceback):
72        self.p.timeout = self.orig_timeout
73
74class ConsoleBase(object):
75    """The interface through which test functions interact with the U-Boot
76    console. This primarily involves executing shell commands, capturing their
77    results, and checking for common error conditions. Some common utilities
78    are also provided too."""
79
80    def __init__(self, log, config, max_fifo_fill):
81        """Initialize a U-Boot console connection.
82
83        Can only usefully be called by sub-classes.
84
85        Args:
86            log: A mulptiplex_log.Logfile object, to which the U-Boot output
87                will be logged.
88            config: A configuration data structure, as built by conftest.py.
89            max_fifo_fill: The maximum number of characters to send to U-Boot
90                command-line before waiting for U-Boot to echo the characters
91                back. For UART-based HW without HW flow control, this value
92                should be set less than the UART RX FIFO size to avoid
93                overflow, assuming that U-Boot can't keep up with full-rate
94                traffic at the baud rate.
95
96        Returns:
97            Nothing.
98        """
99
100        self.log = log
101        self.config = config
102        self.max_fifo_fill = max_fifo_fill
103
104        self.logstream = self.log.get_stream('console', sys.stdout)
105
106        # Array slice removes leading/trailing quotes
107        self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
108        self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE)
109        self.p = None
110        self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
111        self.eval_bad_patterns()
112
113        self.at_prompt = False
114        self.at_prompt_logevt = None
115
116    def eval_bad_patterns(self):
117        self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
118            if self.disable_check_count[pat[PAT_ID]] == 0]
119        self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
120            if self.disable_check_count[pat[PAT_ID]] == 0]
121
122    def close(self):
123        """Terminate the connection to the U-Boot console.
124
125        This function is only useful once all interaction with U-Boot is
126        complete. Once this function is called, data cannot be sent to or
127        received from U-Boot.
128
129        Args:
130            None.
131
132        Returns:
133            Nothing.
134        """
135
136        if self.p:
137            self.p.close()
138        self.logstream.close()
139
140    def run_command(self, cmd, wait_for_echo=True, send_nl=True,
141            wait_for_prompt=True):
142        """Execute a command via the U-Boot console.
143
144        The command is always sent to U-Boot.
145
146        U-Boot echoes any command back to its output, and this function
147        typically waits for that to occur. The wait can be disabled by setting
148        wait_for_echo=False, which is useful e.g. when sending CTRL-C to
149        interrupt a long-running command such as "ums".
150
151        Command execution is typically triggered by sending a newline
152        character. This can be disabled by setting send_nl=False, which is
153        also useful when sending CTRL-C.
154
155        This function typically waits for the command to finish executing, and
156        returns the console output that it generated. This can be disabled by
157        setting wait_for_prompt=False, which is useful when invoking a long-
158        running command such as "ums".
159
160        Args:
161            cmd: The command to send.
162            wait_for_echo: Boolean indicating whether to wait for U-Boot to
163                echo the command text back to its output.
164            send_nl: Boolean indicating whether to send a newline character
165                after the command string.
166            wait_for_prompt: Boolean indicating whether to wait for the
167                command prompt to be sent by U-Boot. This typically occurs
168                immediately after the command has been executed.
169
170        Returns:
171            If wait_for_prompt == False:
172                Nothing.
173            Else:
174                The output from U-Boot during command execution. In other
175                words, the text U-Boot emitted between the point it echod the
176                command string and emitted the subsequent command prompts.
177        """
178
179        if self.at_prompt and \
180                self.at_prompt_logevt != self.logstream.logfile.cur_evt:
181            self.logstream.write(self.prompt, implicit=True)
182
183        try:
184            self.at_prompt = False
185            if send_nl:
186                cmd += '\n'
187            while cmd:
188                # Limit max outstanding data, so UART FIFOs don't overflow
189                chunk = cmd[:self.max_fifo_fill]
190                cmd = cmd[self.max_fifo_fill:]
191                self.p.send(chunk)
192                if not wait_for_echo:
193                    continue
194                chunk = re.escape(chunk)
195                chunk = chunk.replace('\\\n', '[\r\n]')
196                m = self.p.expect([chunk] + self.bad_patterns)
197                if m != 0:
198                    self.at_prompt = False
199                    raise Exception('Bad pattern found on console: ' +
200                                    self.bad_pattern_ids[m - 1])
201            if not wait_for_prompt:
202                return
203            m = self.p.expect([self.prompt_compiled] + self.bad_patterns)
204            if m != 0:
205                self.at_prompt = False
206                raise Exception('Bad pattern found on console: ' +
207                                self.bad_pattern_ids[m - 1])
208            self.at_prompt = True
209            self.at_prompt_logevt = self.logstream.logfile.cur_evt
210            # Only strip \r\n; space/TAB might be significant if testing
211            # indentation.
212            return self.p.before.strip('\r\n')
213        except Exception as ex:
214            self.log.error(str(ex))
215            self.cleanup_spawn()
216            raise
217        finally:
218            self.log.timestamp()
219
220    def run_command_list(self, cmds):
221        """Run a list of commands.
222
223        This is a helper function to call run_command() with default arguments
224        for each command in a list.
225
226        Args:
227            cmd: List of commands (each a string).
228        Returns:
229            A list of output strings from each command, one element for each
230            command.
231        """
232        output = []
233        for cmd in cmds:
234            output.append(self.run_command(cmd))
235        return output
236
237    def ctrlc(self):
238        """Send a CTRL-C character to U-Boot.
239
240        This is useful in order to stop execution of long-running synchronous
241        commands such as "ums".
242
243        Args:
244            None.
245
246        Returns:
247            Nothing.
248        """
249
250        self.log.action('Sending Ctrl-C')
251        self.run_command(chr(3), wait_for_echo=False, send_nl=False)
252
253    def wait_for(self, text):
254        """Wait for a pattern to be emitted by U-Boot.
255
256        This is useful when a long-running command such as "dfu" is executing,
257        and it periodically emits some text that should show up at a specific
258        location in the log file.
259
260        Args:
261            text: The text to wait for; either a string (containing raw text,
262                not a regular expression) or an re object.
263
264        Returns:
265            Nothing.
266        """
267
268        if type(text) == type(''):
269            text = re.escape(text)
270        m = self.p.expect([text] + self.bad_patterns)
271        if m != 0:
272            raise Exception('Bad pattern found on console: ' +
273                            self.bad_pattern_ids[m - 1])
274
275    def drain_console(self):
276        """Read from and log the U-Boot console for a short time.
277
278        U-Boot's console output is only logged when the test code actively
279        waits for U-Boot to emit specific data. There are cases where tests
280        can fail without doing this. For example, if a test asks U-Boot to
281        enable USB device mode, then polls until a host-side device node
282        exists. In such a case, it is useful to log U-Boot's console output
283        in case U-Boot printed clues as to why the host-side even did not
284        occur. This function will do that.
285
286        Args:
287            None.
288
289        Returns:
290            Nothing.
291        """
292
293        # If we are already not connected to U-Boot, there's nothing to drain.
294        # This should only happen when a previous call to run_command() or
295        # wait_for() failed (and hence the output has already been logged), or
296        # the system is shutting down.
297        if not self.p:
298            return
299
300        orig_timeout = self.p.timeout
301        try:
302            # Drain the log for a relatively short time.
303            self.p.timeout = 1000
304            # Wait for something U-Boot will likely never send. This will
305            # cause the console output to be read and logged.
306            self.p.expect(['This should never match U-Boot output'])
307        except:
308            # We expect a timeout, since U-Boot won't print what we waited
309            # for. Squash it when it happens.
310            #
311            # Squash any other exception too. This function is only used to
312            # drain (and log) the U-Boot console output after a failed test.
313            # The U-Boot process will be restarted, or target board reset, once
314            # this function returns. So, we don't care about detecting any
315            # additional errors, so they're squashed so that the rest of the
316            # post-test-failure cleanup code can continue operation, and
317            # correctly terminate any log sections, etc.
318            pass
319        finally:
320            self.p.timeout = orig_timeout
321
322    def ensure_spawned(self):
323        """Ensure a connection to a correctly running U-Boot instance.
324
325        This may require spawning a new Sandbox process or resetting target
326        hardware, as defined by the implementation sub-class.
327
328        This is an internal function and should not be called directly.
329
330        Args:
331            None.
332
333        Returns:
334            Nothing.
335        """
336
337        if self.p:
338            return
339        try:
340            self.log.start_section('Starting U-Boot')
341            self.at_prompt = False
342            self.p = self.get_spawn()
343            # Real targets can take a long time to scroll large amounts of
344            # text if LCD is enabled. This value may need tweaking in the
345            # future, possibly per-test to be optimal. This works for 'help'
346            # on board 'seaboard'.
347            if not self.config.gdbserver:
348                self.p.timeout = 30000
349            self.p.logfile_read = self.logstream
350            bcfg = self.config.buildconfig
351            config_spl = bcfg.get('config_spl', 'n') == 'y'
352            config_spl_serial_support = bcfg.get('config_spl_serial_support',
353                                                 'n') == 'y'
354            env_spl_skipped = self.config.env.get('env__spl_skipped',
355                                                  False)
356            if config_spl and config_spl_serial_support and not env_spl_skipped:
357                m = self.p.expect([pattern_u_boot_spl_signon] +
358                                  self.bad_patterns)
359                if m != 0:
360                    raise Exception('Bad pattern found on SPL console: ' +
361                                    self.bad_pattern_ids[m - 1])
362            m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns)
363            if m != 0:
364                raise Exception('Bad pattern found on console: ' +
365                                self.bad_pattern_ids[m - 1])
366            self.u_boot_version_string = self.p.after
367            while True:
368                m = self.p.expect([self.prompt_compiled,
369                    pattern_stop_autoboot_prompt] + self.bad_patterns)
370                if m == 0:
371                    break
372                if m == 1:
373                    self.p.send(' ')
374                    continue
375                raise Exception('Bad pattern found on console: ' +
376                                self.bad_pattern_ids[m - 2])
377            self.at_prompt = True
378            self.at_prompt_logevt = self.logstream.logfile.cur_evt
379        except Exception as ex:
380            self.log.error(str(ex))
381            self.cleanup_spawn()
382            raise
383        finally:
384            self.log.timestamp()
385            self.log.end_section('Starting U-Boot')
386
387    def cleanup_spawn(self):
388        """Shut down all interaction with the U-Boot instance.
389
390        This is used when an error is detected prior to re-establishing a
391        connection with a fresh U-Boot instance.
392
393        This is an internal function and should not be called directly.
394
395        Args:
396            None.
397
398        Returns:
399            Nothing.
400        """
401
402        try:
403            if self.p:
404                self.p.close()
405        except:
406            pass
407        self.p = None
408
409    def restart_uboot(self):
410        """Shut down and restart U-Boot."""
411        self.cleanup_spawn()
412        self.ensure_spawned()
413
414    def get_spawn_output(self):
415        """Return the start-up output from U-Boot
416
417        Returns:
418            The output produced by ensure_spawed(), as a string.
419        """
420        if self.p:
421            return self.p.get_expect_output()
422        return None
423
424    def validate_version_string_in_text(self, text):
425        """Assert that a command's output includes the U-Boot signon message.
426
427        This is primarily useful for validating the "version" command without
428        duplicating the signon text regex in a test function.
429
430        Args:
431            text: The command output text to check.
432
433        Returns:
434            Nothing. An exception is raised if the validation fails.
435        """
436
437        assert(self.u_boot_version_string in text)
438
439    def disable_check(self, check_type):
440        """Temporarily disable an error check of U-Boot's output.
441
442        Create a new context manager (for use with the "with" statement) which
443        temporarily disables a particular console output error check.
444
445        Args:
446            check_type: The type of error-check to disable. Valid values may
447            be found in self.disable_check_count above.
448
449        Returns:
450            A context manager object.
451        """
452
453        return ConsoleDisableCheck(self, check_type)
454
455    def temporary_timeout(self, timeout):
456        """Temporarily set up different timeout for commands.
457
458        Create a new context manager (for use with the "with" statement) which
459        temporarily change timeout.
460
461        Args:
462            timeout: Time in milliseconds.
463
464        Returns:
465            A context manager object.
466        """
467
468        return ConsoleSetupTimeout(self, timeout)
469