1# Copyright (c) 2015 Stephen Warren
2# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
3#
4# SPDX-License-Identifier: GPL-2.0
5
6# Common logic to interact with U-Boot via the console. This class provides
7# the interface that tests use to execute U-Boot shell commands and wait for
8# their results. Sub-classes exist to perform board-type-specific setup
9# operations, such as spawning a sub-process for Sandbox, or attaching to the
10# serial console of real hardware.
11
12import multiplexed_log
13import os
14import pytest
15import re
16import sys
17import u_boot_spawn
18
19# Regexes for text we expect U-Boot to send to the console.
20pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
21pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))')
22pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
23pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
24pattern_error_notification = re.compile('## Error: ')
25pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
26
27PAT_ID = 0
28PAT_RE = 1
29
30bad_pattern_defs = (
31    ('spl_signon', pattern_u_boot_spl_signon),
32    ('main_signon', pattern_u_boot_main_signon),
33    ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
34    ('unknown_command', pattern_unknown_command),
35    ('error_notification', pattern_error_notification),
36    ('error_please_reset', pattern_error_please_reset),
37)
38
39class ConsoleDisableCheck(object):
40    """Context manager (for Python's with statement) that temporarily disables
41    the specified console output error check. This is useful when deliberately
42    executing a command that is known to trigger one of the error checks, in
43    order to test that the error condition is actually raised. This class is
44    used internally by ConsoleBase::disable_check(); it is not intended for
45    direct usage."""
46
47    def __init__(self, console, check_type):
48        self.console = console
49        self.check_type = check_type
50
51    def __enter__(self):
52        self.console.disable_check_count[self.check_type] += 1
53        self.console.eval_bad_patterns()
54
55    def __exit__(self, extype, value, traceback):
56        self.console.disable_check_count[self.check_type] -= 1
57        self.console.eval_bad_patterns()
58
59class ConsoleSetupTimeout(object):
60    """Context manager (for Python's with statement) that temporarily sets up
61    timeout for specific command. This is useful when execution time is greater
62    then default 30s."""
63
64    def __init__(self, console, timeout):
65        self.p = console.p
66        self.orig_timeout = self.p.timeout
67        self.p.timeout = timeout
68
69    def __enter__(self):
70        return self
71
72    def __exit__(self, extype, value, traceback):
73        self.p.timeout = self.orig_timeout
74
75class ConsoleBase(object):
76    """The interface through which test functions interact with the U-Boot
77    console. This primarily involves executing shell commands, capturing their
78    results, and checking for common error conditions. Some common utilities
79    are also provided too."""
80
81    def __init__(self, log, config, max_fifo_fill):
82        """Initialize a U-Boot console connection.
83
84        Can only usefully be called by sub-classes.
85
86        Args:
87            log: A mulptiplex_log.Logfile object, to which the U-Boot output
88                will be logged.
89            config: A configuration data structure, as built by conftest.py.
90            max_fifo_fill: The maximum number of characters to send to U-Boot
91                command-line before waiting for U-Boot to echo the characters
92                back. For UART-based HW without HW flow control, this value
93                should be set less than the UART RX FIFO size to avoid
94                overflow, assuming that U-Boot can't keep up with full-rate
95                traffic at the baud rate.
96
97        Returns:
98            Nothing.
99        """
100
101        self.log = log
102        self.config = config
103        self.max_fifo_fill = max_fifo_fill
104
105        self.logstream = self.log.get_stream('console', sys.stdout)
106
107        # Array slice removes leading/trailing quotes
108        self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
109        self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE)
110        self.p = None
111        self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
112        self.eval_bad_patterns()
113
114        self.at_prompt = False
115        self.at_prompt_logevt = None
116
117    def eval_bad_patterns(self):
118        self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
119            if self.disable_check_count[pat[PAT_ID]] == 0]
120        self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
121            if self.disable_check_count[pat[PAT_ID]] == 0]
122
123    def close(self):
124        """Terminate the connection to the U-Boot console.
125
126        This function is only useful once all interaction with U-Boot is
127        complete. Once this function is called, data cannot be sent to or
128        received from U-Boot.
129
130        Args:
131            None.
132
133        Returns:
134            Nothing.
135        """
136
137        if self.p:
138            self.p.close()
139        self.logstream.close()
140
141    def run_command(self, cmd, wait_for_echo=True, send_nl=True,
142            wait_for_prompt=True):
143        """Execute a command via the U-Boot console.
144
145        The command is always sent to U-Boot.
146
147        U-Boot echoes any command back to its output, and this function
148        typically waits for that to occur. The wait can be disabled by setting
149        wait_for_echo=False, which is useful e.g. when sending CTRL-C to
150        interrupt a long-running command such as "ums".
151
152        Command execution is typically triggered by sending a newline
153        character. This can be disabled by setting send_nl=False, which is
154        also useful when sending CTRL-C.
155
156        This function typically waits for the command to finish executing, and
157        returns the console output that it generated. This can be disabled by
158        setting wait_for_prompt=False, which is useful when invoking a long-
159        running command such as "ums".
160
161        Args:
162            cmd: The command to send.
163            wait_for_each: Boolean indicating whether to wait for U-Boot to
164                echo the command text back to its output.
165            send_nl: Boolean indicating whether to send a newline character
166                after the command string.
167            wait_for_prompt: Boolean indicating whether to wait for the
168                command prompt to be sent by U-Boot. This typically occurs
169                immediately after the command has been executed.
170
171        Returns:
172            If wait_for_prompt == False:
173                Nothing.
174            Else:
175                The output from U-Boot during command execution. In other
176                words, the text U-Boot emitted between the point it echod the
177                command string and emitted the subsequent command prompts.
178        """
179
180        if self.at_prompt and \
181                self.at_prompt_logevt != self.logstream.logfile.cur_evt:
182            self.logstream.write(self.prompt, implicit=True)
183
184        try:
185            self.at_prompt = False
186            if send_nl:
187                cmd += '\n'
188            while cmd:
189                # Limit max outstanding data, so UART FIFOs don't overflow
190                chunk = cmd[:self.max_fifo_fill]
191                cmd = cmd[self.max_fifo_fill:]
192                self.p.send(chunk)
193                if not wait_for_echo:
194                    continue
195                chunk = re.escape(chunk)
196                chunk = chunk.replace('\\\n', '[\r\n]')
197                m = self.p.expect([chunk] + self.bad_patterns)
198                if m != 0:
199                    self.at_prompt = False
200                    raise Exception('Bad pattern found on console: ' +
201                                    self.bad_pattern_ids[m - 1])
202            if not wait_for_prompt:
203                return
204            m = self.p.expect([self.prompt_compiled] + self.bad_patterns)
205            if m != 0:
206                self.at_prompt = False
207                raise Exception('Bad pattern found on console: ' +
208                                self.bad_pattern_ids[m - 1])
209            self.at_prompt = True
210            self.at_prompt_logevt = self.logstream.logfile.cur_evt
211            # Only strip \r\n; space/TAB might be significant if testing
212            # indentation.
213            return self.p.before.strip('\r\n')
214        except Exception as ex:
215            self.log.error(str(ex))
216            self.cleanup_spawn()
217            raise
218
219    def run_command_list(self, cmds):
220        """Run a list of commands.
221
222        This is a helper function to call run_command() with default arguments
223        for each command in a list.
224
225        Args:
226            cmd: List of commands (each a string).
227        Returns:
228            A list of output strings from each command, one element for each
229            command.
230        """
231        output = []
232        for cmd in cmds:
233            output.append(self.run_command(cmd))
234        return output
235
236    def ctrlc(self):
237        """Send a CTRL-C character to U-Boot.
238
239        This is useful in order to stop execution of long-running synchronous
240        commands such as "ums".
241
242        Args:
243            None.
244
245        Returns:
246            Nothing.
247        """
248
249        self.log.action('Sending Ctrl-C')
250        self.run_command(chr(3), wait_for_echo=False, send_nl=False)
251
252    def wait_for(self, text):
253        """Wait for a pattern to be emitted by U-Boot.
254
255        This is useful when a long-running command such as "dfu" is executing,
256        and it periodically emits some text that should show up at a specific
257        location in the log file.
258
259        Args:
260            text: The text to wait for; either a string (containing raw text,
261                not a regular expression) or an re object.
262
263        Returns:
264            Nothing.
265        """
266
267        if type(text) == type(''):
268            text = re.escape(text)
269        m = self.p.expect([text] + self.bad_patterns)
270        if m != 0:
271            raise Exception('Bad pattern found on console: ' +
272                            self.bad_pattern_ids[m - 1])
273
274    def drain_console(self):
275        """Read from and log the U-Boot console for a short time.
276
277        U-Boot's console output is only logged when the test code actively
278        waits for U-Boot to emit specific data. There are cases where tests
279        can fail without doing this. For example, if a test asks U-Boot to
280        enable USB device mode, then polls until a host-side device node
281        exists. In such a case, it is useful to log U-Boot's console output
282        in case U-Boot printed clues as to why the host-side even did not
283        occur. This function will do that.
284
285        Args:
286            None.
287
288        Returns:
289            Nothing.
290        """
291
292        # If we are already not connected to U-Boot, there's nothing to drain.
293        # This should only happen when a previous call to run_command() or
294        # wait_for() failed (and hence the output has already been logged), or
295        # the system is shutting down.
296        if not self.p:
297            return
298
299        orig_timeout = self.p.timeout
300        try:
301            # Drain the log for a relatively short time.
302            self.p.timeout = 1000
303            # Wait for something U-Boot will likely never send. This will
304            # cause the console output to be read and logged.
305            self.p.expect(['This should never match U-Boot output'])
306        except u_boot_spawn.Timeout:
307            pass
308        finally:
309            self.p.timeout = orig_timeout
310
311    def ensure_spawned(self):
312        """Ensure a connection to a correctly running U-Boot instance.
313
314        This may require spawning a new Sandbox process or resetting target
315        hardware, as defined by the implementation sub-class.
316
317        This is an internal function and should not be called directly.
318
319        Args:
320            None.
321
322        Returns:
323            Nothing.
324        """
325
326        if self.p:
327            return
328        try:
329            self.log.start_section('Starting U-Boot')
330            self.at_prompt = False
331            self.p = self.get_spawn()
332            # Real targets can take a long time to scroll large amounts of
333            # text if LCD is enabled. This value may need tweaking in the
334            # future, possibly per-test to be optimal. This works for 'help'
335            # on board 'seaboard'.
336            if not self.config.gdbserver:
337                self.p.timeout = 30000
338            self.p.logfile_read = self.logstream
339            bcfg = self.config.buildconfig
340            config_spl = bcfg.get('config_spl', 'n') == 'y'
341            config_spl_serial_support = bcfg.get('config_spl_serial_support',
342                                                 'n') == 'y'
343            env_spl_skipped = self.config.env.get('env__spl_skipped',
344                                                  False)
345            if config_spl and config_spl_serial_support and not env_spl_skipped:
346                m = self.p.expect([pattern_u_boot_spl_signon] +
347                                  self.bad_patterns)
348                if m != 0:
349                    raise Exception('Bad pattern found on SPL console: ' +
350                                    self.bad_pattern_ids[m - 1])
351            m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns)
352            if m != 0:
353                raise Exception('Bad pattern found on console: ' +
354                                self.bad_pattern_ids[m - 1])
355            self.u_boot_version_string = self.p.after
356            while True:
357                m = self.p.expect([self.prompt_compiled,
358                    pattern_stop_autoboot_prompt] + self.bad_patterns)
359                if m == 0:
360                    break
361                if m == 1:
362                    self.p.send(' ')
363                    continue
364                raise Exception('Bad pattern found on console: ' +
365                                self.bad_pattern_ids[m - 2])
366            self.at_prompt = True
367            self.at_prompt_logevt = self.logstream.logfile.cur_evt
368        except Exception as ex:
369            self.log.error(str(ex))
370            self.cleanup_spawn()
371            raise
372        finally:
373            self.log.end_section('Starting U-Boot')
374
375    def cleanup_spawn(self):
376        """Shut down all interaction with the U-Boot instance.
377
378        This is used when an error is detected prior to re-establishing a
379        connection with a fresh U-Boot instance.
380
381        This is an internal function and should not be called directly.
382
383        Args:
384            None.
385
386        Returns:
387            Nothing.
388        """
389
390        try:
391            if self.p:
392                self.p.close()
393        except:
394            pass
395        self.p = None
396
397    def restart_uboot(self):
398        """Shut down and restart U-Boot."""
399        self.cleanup_spawn()
400        self.ensure_spawned()
401
402    def get_spawn_output(self):
403        """Return the start-up output from U-Boot
404
405        Returns:
406            The output produced by ensure_spawed(), as a string.
407        """
408        if self.p:
409            return self.p.get_expect_output()
410        return None
411
412    def validate_version_string_in_text(self, text):
413        """Assert that a command's output includes the U-Boot signon message.
414
415        This is primarily useful for validating the "version" command without
416        duplicating the signon text regex in a test function.
417
418        Args:
419            text: The command output text to check.
420
421        Returns:
422            Nothing. An exception is raised if the validation fails.
423        """
424
425        assert(self.u_boot_version_string in text)
426
427    def disable_check(self, check_type):
428        """Temporarily disable an error check of U-Boot's output.
429
430        Create a new context manager (for use with the "with" statement) which
431        temporarily disables a particular console output error check.
432
433        Args:
434            check_type: The type of error-check to disable. Valid values may
435            be found in self.disable_check_count above.
436
437        Returns:
438            A context manager object.
439        """
440
441        return ConsoleDisableCheck(self, check_type)
442
443    def temporary_timeout(self, timeout):
444        """Temporarily set up different timeout for commands.
445
446        Create a new context manager (for use with the "with" statement) which
447        temporarily change timeout.
448
449        Args:
450            timeout: Time in milliseconds.
451
452        Returns:
453            A context manager object.
454        """
455
456        return ConsoleSetupTimeout(self, timeout)
457