Integration test in python

16,307

It depends a bit on the application you are testing. Let's say your application is an API, then the integration test could be simply calling the different endpoints and comparing the results.

You could make an IntegrationTest class with the proper write_result functions.

So a function could be:

def test_get_users_status(self):
    expected_result = { "user1": "active",  "user2": "inactive" }
    r = requests.get('https://localhost:8080/get_users_status', auth=('user', 'pass'))
    assert r.status_code is 200
    self.write_json(expected_result, r.json())

You can have a function in the integration class that runs all tests, all authentication related tests etc.

Share:
16,307
iAmoric
Author by

iAmoric

Updated on June 14, 2022

Comments

  • iAmoric
    iAmoric almost 2 years

    I have a python project that I want to test. I already have unit test with unittest but I need to do integration tests.

    For that, I have two applications: the real one that I have to test, and a "testing" application that will send requests to the first one, wait for the response and then compare it with the expected result:

    enter image description here

    Like that I wil be able to test if the application is responding correctly to the requests.

    For the moment, I have what I described above, but in the main.py (not a specific test file). Also, the comparaison is just done with print function, so that I can see that it works. But I have to execute these tests and be able to get results in a conventional format, like junit xml.

    How can I write, run and get results of these tests ?

    EDIT

    I'm developing an Azure IoT Edge Module, and I'm using Route to wire the modules. Here is the code of the testing module, where I need to execute tests:

    import random
    import time
    import sys
    import iothub_client
    import json
    
    # pylint: disable=E0611
    from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
    from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
    
    # Callback received when the message that we're forwarding is processed.
    def send_confirmation_callback(message, result, user_context):
        print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
    
    # receive_message_callback is invoked when an incoming message arrives on INPUT queue
    def receive_message_callback(message, hubManager):
        message_buffer = message.get_bytearray()
        size = len(message_buffer)
        message_text = message_buffer[:size].decode('utf-8')
        data = json.loads(message_text)
        result = data["result"]
        print ("expected_result: %d; result: %d ==> %r" %(EXPECTED_RESULT, result, EXPECTED_RESULT==result))
    
    class HubManager(object):
        def __init__(self, protocol=IoTHubTransportProvider.MQTT):
            self.client_protocol = protocol
            self.client = IoTHubModuleClient()
            self.client.create_from_environment(protocol)
            self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
            # sets the callback when a message arrives on INPUT queue.
            self.client.set_message_callback(INPUT, receive_message_callback, self)
    
        # Forwards the message received onto the next stage in the process.
        def forward_event_to_output(self, outputQueueName, event, send_context):
            self.client.send_event_async(
                outputQueueName, event, send_confirmation_callback, send_context)
    
    def main(protocol):
        try:
            hub_manager = HubManager(protocol)
    
            # Send request 
            message = "{\"param1\": %d,\"param2\": %d}" % (PARAM_1, PARAM_2)
            msg_txt_formatted = IoTHubMessage(message)
            hub_manager.forward_event_to_output(OUTPUT, msg_txt_formatted, 0)
    
            while True:
                time.sleep(1)
    
        except IoTHubError as iothub_error:
            print ( "Unexpected error %s from IoTHub" % iothub_error )
            return
        except KeyboardInterrupt:
            print ( "IoTHubModuleClient sample stopped" )
    
    if __name__ == '__main__':
        main(PROTOCOL)
    
  • iAmoric
    iAmoric about 5 years
    Ok. Actually I'm developping an Azure IoT Edge module and the way to send requests and get results is not so simple (there are callbacks). I've added the code in my question to explain how it works
  • trippelganger
    trippelganger about 5 years
    @iAmoric Can you yield the function calls?
  • iAmoric
    iAmoric about 5 years
    What function? I can manually call forward_event_to_output but I can't for the receive_message_callback. It's a callback function automatically called when a new message is put in the queue.