struct: unpack Requires String Argument of Length 16

11,747

Solution 1

recvPacket is bigger than your structure. If your structure is the first part of the data, unpack just the bytes of the structure:

pktFormat = 'bbHHhd'
pktSize = struct.calcsize(pktFormat)
... = struct.unpack(pktFormat, recvPacket[:pktSize])

Solution 2

The struct.unpack function requires that the data you pass to it match the format string's length exactly.

If you have a large buffer and you only want to decode part of it, consider using the struct.unpack_from function instead. It takes an additional argument specifying the offset to begin decoding at, and accepts buffers larger than the format string describes:

(request_code, request_type, checksum, packet_id, sequence,
 timeSent, data) = struct.unpack_from("bbHHhd", recvPacket, 0)

You may find this function useful if you want to decode other parts of the packet data after parsing the header.

Share:
11,747
Matt
Author by

Matt

CS student at NYU-Polytechnic

Updated on June 04, 2022

Comments

  • Matt
    Matt almost 2 years

    For my Computer Networking class, I'm trying to implement Traceroute using raw sockets with the ICMP protocol. I need to build a packet and then unpack the response packet using the Python struct class. Here is the code for building the packet:

    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, myChecksum, pid, 1)
    data = struct.pack("d", time.time())
    packet = header + data
    

    Later, I receive an ICMP packet in the same format with the confirmation. Here is the code for unpacking the packet:

    request_code, request_type, checksum, packet_id, \
                    sequence, timeSent, data = struct.unpack("bbHHhd", recvPacket)
    

    But I'm getting the following error: struct.error: unpack requires a string argument of length 16.

    I don't understand because when I check struct.calcsize() for the format string, it returns 16.

    Here is my full program if you would like to run it on your machine

    from socket import *
    import socket
    import os
    import sys
    import struct
    import time
    import select
    import binascii
    
    ICMP_ECHO_REQUEST = 8
    MAX_HOPS = 30
    TIMEOUT = 2.0
    TRIES = 2
    
    # The packet that we shall send to each router along the path is the ICMP echo
    # request packet, which is exactly what we had used in the ICMP ping exercise.
    # We shall use the same packet that we built in the Ping exercise
    
    def checksum(str):
        csum = 0
        countTo = (len(str) / 2) * 2
        count = 0
    
        while count < countTo:
            thisVal = ord(str[count+1]) * 256 + ord(str[count])
            csum = csum + thisVal
            csum = csum & 0xffffffffL
            count = count + 2
    
        if countTo < len(str):
            csum = csum + ord(str[len(str) - 1])
            csum = csum & 0xffffffffL
    
        csum = (csum >> 16) + (csum & 0xffff)
        csum = csum + (csum >> 16)
        answer = ~csum
        answer = answer & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)
        return answer
    
    def build_packet():
        # In the sendOnePing() method of the ICMP Ping exercise ,firstly the header of our
        # packet to be sent was made, secondly the checksum was appended to the header and
        # then finally the complete packet was sent to the destination.
    
        # Make the header in a similar way to the ping exercise.
        # Header is type (8), code (8), checksum (16), id (16), sequence (16)
        myChecksum = 0
        pid = os.getpid() & 0xFFFF
    
        # Make a dummy header with a 0 checksum.
        # struct -- Interpret strings as packed binary data
        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, myChecksum, pid, 1)
        #header = struct.pack("!HHHHH", ICMP_ECHO_REQUEST, 0, myChecksum, pid, 1)
        data = struct.pack("d", time.time())
    
        # Calculate the checksum on the data and the dummy header.
        # Append checksum to the header.
        myChecksum = checksum(header + data)    
        if sys.platform == 'darwin':
            myChecksum = socket.htons(myChecksum) & 0xffff
            #Convert 16-bit integers from host to network byte order.
        else:
            myChecksum = htons(myChecksum)
    
        packet = header + data
        return packet
    
    def get_route(hostname):
        timeLeft = TIMEOUT
        for ttl in xrange(1,MAX_HOPS):
            for tries in xrange(TRIES):
                destAddr = socket.gethostbyname(hostname)
                #Fill in start
                # Make a raw socket named mySocket
                mySocket = socket.socket(AF_INET, SOCK_RAW, getprotobyname("icmp"))
                mySocket.bind(("", 12000));
                #Fill in end
                mySocket.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, struct.pack('I', ttl))
                mySocket.settimeout(TIMEOUT)
                try:
                    d = build_packet()
                    mySocket.sendto(d, (hostname, 0))
                    t = time.time()
                    startedSelect = time.time()
                    whatReady = select.select([mySocket], [], [], timeLeft)
                    howLongInSelect = (time.time() - startedSelect)
                    if whatReady[0] == []: # Timeout
                        print "*    *    * Request timed out."
    
                    recvPacket, addr = mySocket.recvfrom(1024)
                    print addr
                    timeReceived = time.time()
                    timeLeft = timeLeft - howLongInSelect
                    if timeLeft <= 0:
                        print "*    *    * Request timed out."
                except socket.timeout:
                    continue
                else:
                    #Fill in start
                    # Fetch the icmp type from the IP packet
                    print struct.calcsize("bbHHhd")
                    request_code, request_type, checksum, packet_id, \
                        sequence, timeSent, data = struct.unpack("bbHHhd", recvPacket)
                    #Fill in end
    
                    if request_type == 11:
                        bytes = struct.calcsize("d")
                        timeSent = struct.unpack("d", recvPacket[28:28 + bytes])[0]
                        print " %d   rtt=%.0f ms %s" % (ttl,(timeReceived -t)*1000, addr[0])
                    elif request_type == 3:
                        bytes = struct.calcsize("d")
                        timeSent = struct.unpack("d", recvPacket[28:28 + bytes])[0]
                        print " %d   rtt=%.0f ms %s" % (ttl,(timeReceived -t)*1000, addr[0])
                    elif request_type == 0:
                        bytes = struct.calcsize("d")
                        timeSent = struct.unpack("d", recvPacket[28:28 + bytes])[0]
                        print " %d   rtt=%.0f ms %s" % (ttl,(timeReceived -timeSent)*1000, addr[0])
                        return
                    else:
                        print "error"
                        break
                finally:
                    mySocket.close()
    
    get_route("www.google.com")
    
  • James Henstridge
    James Henstridge over 11 years
    Or alternatively use struct.unpack_from, which avoids the copy and can be used to decode other parts of the buffer.
  • Karl Knechtel
    Karl Knechtel over 11 years
    @JamesHenstridge Please write this up as an answer.
  • Xofo
    Xofo over 5 years
    Wonderful answer. I will ask (for the peanut gallery), is there a Perl equivalent (given that the pack and unpack functions have Perl variants)?