xref: /openbmc/openbmc/poky/bitbake/lib/toaster/tests/functional/functional_helpers.py (revision 8460358c3d24c71d9d38fd126c745854a6301564)
1#! /usr/bin/env python3
2#
3# BitBake Toaster functional tests implementation
4#
5# Copyright (C) 2017 Intel Corporation
6#
7# SPDX-License-Identifier: GPL-2.0-only
8#
9
10import os
11import logging
12import subprocess
13import signal
14import re
15import requests
16
17from django.urls import reverse
18from tests.browser.selenium_helpers_base import SeleniumTestCaseBase
19from selenium.webdriver.common.by import By
20from selenium.webdriver.support.select import Select
21from selenium.common.exceptions import NoSuchElementException
22
23logger = logging.getLogger("toaster")
24toaster_processes = []
25
26class SeleniumFunctionalTestCase(SeleniumTestCaseBase):
27    wait_toaster_time = 10
28
29    @classmethod
30    def setUpClass(cls):
31        # So that the buildinfo helper uses the test database'
32        if os.environ.get('DJANGO_SETTINGS_MODULE', '') != \
33            'toastermain.settings_test':
34            raise RuntimeError("Please initialise django with the tests settings:  "
35                "DJANGO_SETTINGS_MODULE='toastermain.settings_test'")
36
37        # Wait for any known toaster processes to exit
38        global toaster_processes
39        for toaster_process in toaster_processes:
40            try:
41                os.waitpid(toaster_process, os.WNOHANG)
42            except ChildProcessError:
43                pass
44
45        # start toaster
46        cmd = "bash -c 'source toaster start'"
47        start_process = subprocess.Popen(
48            cmd,
49            cwd=os.environ.get("BUILDDIR"),
50            shell=True)
51        toaster_processes = [start_process.pid]
52        if start_process.wait() != 0:
53            port_use = os.popen("lsof -i -P -n | grep '8000 (LISTEN)'").read().strip()
54            message = ''
55            if port_use:
56                process_id = port_use.split()[1]
57                process = os.popen(f"ps -o cmd= -p {process_id}").read().strip()
58                message = f"Port 8000 occupied by {process}"
59            raise RuntimeError(f"Can't initialize toaster. {message}")
60
61        builddir = os.environ.get("BUILDDIR")
62        with open(os.path.join(builddir, '.toastermain.pid'), 'r') as f:
63            toaster_processes.append(int(f.read()))
64        with open(os.path.join(builddir, '.runbuilds.pid'), 'r') as f:
65            toaster_processes.append(int(f.read()))
66
67        super(SeleniumFunctionalTestCase, cls).setUpClass()
68        cls.live_server_url = 'http://localhost:8000/'
69
70    @classmethod
71    def tearDownClass(cls):
72        super(SeleniumFunctionalTestCase, cls).tearDownClass()
73
74        global toaster_processes
75
76        cmd = "bash -c 'source toaster stop'"
77        stop_process = subprocess.Popen(
78            cmd,
79            cwd=os.environ.get("BUILDDIR"),
80            shell=True)
81        # Toaster stop has been known to hang in these tests so force kill if it stalls
82        try:
83            if stop_process.wait(cls.wait_toaster_time) != 0:
84                raise Exception('Toaster stop process failed')
85        except Exception as e:
86            if e is subprocess.TimeoutExpired:
87                print('Toaster stop process took too long. Force killing toaster...')
88            else:
89                print('Toaster stop process failed. Force killing toaster...')
90            stop_process.kill()
91            for toaster_process in toaster_processes:
92                os.kill(toaster_process, signal.SIGTERM)
93
94
95    def get_URL(self):
96         rc=self.get_page_source()
97         project_url=re.search(r"(projectPageUrl\s:\s\")(.*)(\",)",rc)
98         return project_url.group(2)
99
100
101    def find_element_by_link_text_in_table(self, table_id, link_text):
102        """
103        Assume there're multiple suitable "find_element_by_link_text".
104        In this circumstance we need to specify "table".
105        """
106        try:
107            table_element = self.get_table_element(table_id)
108            element = table_element.find_element(By.LINK_TEXT, link_text)
109        except NoSuchElementException:
110            print('no element found')
111            raise
112        return element
113
114    def get_table_element(self, table_id, *coordinate):
115        if len(coordinate) == 0:
116#return whole-table element
117            element_xpath = "//*[@id='" + table_id + "']"
118            try:
119                element = self.driver.find_element(By.XPATH, element_xpath)
120            except NoSuchElementException:
121                raise
122            return element
123        row = coordinate[0]
124
125        if len(coordinate) == 1:
126#return whole-row element
127            element_xpath = "//*[@id='" + table_id + "']/tbody/tr[" + str(row) + "]"
128            try:
129                element = self.driver.find_element(By.XPATH, element_xpath)
130            except NoSuchElementException:
131                return False
132            return element
133#now we are looking for an element with specified X and Y
134        column = coordinate[1]
135
136        element_xpath = "//*[@id='" + table_id + "']/tbody/tr[" + str(row) + "]/td[" + str(column) + "]"
137        try:
138            element = self.driver.find_element(By.XPATH, element_xpath)
139        except NoSuchElementException:
140            return False
141        return element
142
143    def create_new_project(
144        self,
145        project_name,
146        release,
147        release_title,
148        merge_toaster_settings,
149    ):
150        """ Create/Test new project using:
151          - Project Name: Any string
152          - Release: Any string
153          - Merge Toaster settings: True or False
154        """
155
156        # Obtain a CSRF token from a suitable URL
157        projs = requests.get(self.live_server_url + reverse('newproject'))
158        csrftoken = projs.cookies.get('csrftoken')
159
160        # Use the projects typeahead to find out if the project already exists
161        req = requests.get(self.live_server_url + reverse('xhr_projectstypeahead'), {'search': project_name, 'format' : 'json'})
162        data = req.json()
163        # Delete any existing projects
164        for result in data['results']:
165            del_url = reverse('xhr_project', args=(result['id'],))
166            del_response = requests.delete(self.live_server_url + del_url, cookies={'csrftoken': csrftoken}, headers={'X-CSRFToken': csrftoken})
167            self.assertEqual(del_response.status_code, 200)
168
169        self.get(reverse('newproject'))
170        self.wait_until_visible('#new-project-name')
171        self.driver.find_element(By.ID,
172                                 "new-project-name").send_keys(project_name)
173
174        select = Select(self.find('#projectversion'))
175        select.select_by_value(release)
176
177        # check merge toaster settings
178        checkbox = self.find('.checkbox-mergeattr')
179        if merge_toaster_settings:
180            if not checkbox.is_selected():
181                checkbox.click()
182        else:
183            if checkbox.is_selected():
184                checkbox.click()
185
186        self.wait_until_clickable('#create-project-button')
187
188        self.driver.find_element(By.ID, "create-project-button").click()
189
190        element = self.wait_until_visible('#project-created-notification')
191        self.assertTrue(
192            self.element_exists('#project-created-notification'),
193            f"Project:{project_name} creation notification not shown"
194        )
195        self.assertTrue(
196            project_name in element.text,
197            f"New project name:{project_name} not in new project notification"
198        )
199
200        # Use the projects typeahead again to check the project now exists
201        req = requests.get(self.live_server_url + reverse('xhr_projectstypeahead'), {'search': project_name, 'format' : 'json'})
202        data = req.json()
203        self.assertGreater(len(data['results']), 0, f"New project:{project_name} not found in database")
204
205        project_id = data['results'][0]['id']
206
207        self.wait_until_visible('#project-release-title')
208
209        # check release
210        if release_title is not None:
211            self.assertTrue(re.search(
212                release_title,
213                self.driver.find_element(By.XPATH,
214                                     "//span[@id='project-release-title']"
215                                     ).text),
216                            'The project release is not defined')
217
218        return project_id
219
220    def load_projects_page_helper(self):
221        self.wait_until_present('#projectstable')
222        # Need to wait for some data in the table too
223        self.wait_until_present('td[class="updated"]')
224
225