Integration test in python
It depends a bit on the application you are testing. Let's say your application is an API, then the integration test could be simply calling the different endpoints and comparing the results.
You could make an IntegrationTest class with the proper write_result functions.
So a function could be:
def test_get_users_status(self):
expected_result = { "user1": "active", "user2": "inactive" }
r = requests.get('https://localhost:8080/get_users_status', auth=('user', 'pass'))
assert r.status_code is 200
self.write_json(expected_result, r.json())
You can have a function in the integration class that runs all tests, all authentication related tests etc.
iAmoric
Updated on June 14, 2022Comments
-
iAmoric almost 2 years
I have a python project that I want to test. I already have unit test with
unittest
but I need to do integration tests.For that, I have two applications: the real one that I have to test, and a "testing" application that will send requests to the first one, wait for the response and then compare it with the expected result:
Like that I wil be able to test if the application is responding correctly to the requests.
For the moment, I have what I described above, but in the main.py (not a specific test file). Also, the comparaison is just done with print function, so that I can see that it works. But I have to execute these tests and be able to get results in a conventional format, like junit xml.
How can I write, run and get results of these tests ?
EDIT
I'm developing an Azure IoT Edge Module, and I'm using Route to wire the modules. Here is the code of the testing module, where I need to execute tests:
import random import time import sys import iothub_client import json # pylint: disable=E0611 from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError # Callback received when the message that we're forwarding is processed. def send_confirmation_callback(message, result, user_context): print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) ) # receive_message_callback is invoked when an incoming message arrives on INPUT queue def receive_message_callback(message, hubManager): message_buffer = message.get_bytearray() size = len(message_buffer) message_text = message_buffer[:size].decode('utf-8') data = json.loads(message_text) result = data["result"] print ("expected_result: %d; result: %d ==> %r" %(EXPECTED_RESULT, result, EXPECTED_RESULT==result)) class HubManager(object): def __init__(self, protocol=IoTHubTransportProvider.MQTT): self.client_protocol = protocol self.client = IoTHubModuleClient() self.client.create_from_environment(protocol) self.client.set_option("messageTimeout", MESSAGE_TIMEOUT) # sets the callback when a message arrives on INPUT queue. self.client.set_message_callback(INPUT, receive_message_callback, self) # Forwards the message received onto the next stage in the process. def forward_event_to_output(self, outputQueueName, event, send_context): self.client.send_event_async( outputQueueName, event, send_confirmation_callback, send_context) def main(protocol): try: hub_manager = HubManager(protocol) # Send request message = "{\"param1\": %d,\"param2\": %d}" % (PARAM_1, PARAM_2) msg_txt_formatted = IoTHubMessage(message) hub_manager.forward_event_to_output(OUTPUT, msg_txt_formatted, 0) while True: time.sleep(1) except IoTHubError as iothub_error: print ( "Unexpected error %s from IoTHub" % iothub_error ) return except KeyboardInterrupt: print ( "IoTHubModuleClient sample stopped" ) if __name__ == '__main__': main(PROTOCOL)
-
iAmoric about 5 yearsOk. Actually I'm developping an Azure IoT Edge module and the way to send requests and get results is not so simple (there are callbacks). I've added the code in my question to explain how it works
-
trippelganger about 5 years@iAmoric Can you yield the function calls?
-
iAmoric about 5 yearsWhat function? I can manually call forward_event_to_output but I can't for the receive_message_callback. It's a callback function automatically called when a new message is put in the queue.