C# Begin/EndReceive - how do I read large data?

51

Solution 1

No - call BeginReceive again from the callback handler, until EndReceive returns 0. Basically, you should keep on receiving asynchronously, assuming you want the fullest benefit of asynchronous IO.

If you look at the MSDN page for Socket.BeginReceive you'll see an example of this. (Admittedly it's not as easy to follow as it might be.)

Solution 2

Dang. I'm hesitant to even reply to this given the dignitaries that have already weighed in, but here goes. Be gentle, O Great Ones!

Without having the benefit of reading Marc's blog (it's blocked here due the corporate internet policy), I'm going to offer "another way."

The trick, in my mind, is to separate the receipt of the data from the processing of that data.

I use a StateObject class defined like this. It differs from the MSDN StateObject implementation in that it does not include the StringBuilder object, the BUFFER_SIZE constant is private, and it includes a constructor for convenience.

public class StateObject
{
    private const int BUFFER_SIZE = 65535;
    public byte[] Buffer = new byte[BUFFER_SIZE];
    public readonly Socket WorkSocket = null;

    public StateObject(Socket workSocket)
    {
        WorkSocket = workSocket;
    }
}

I also have a Packet class that is simply a wrapper around a buffer and a timestamp.

public class Packet
{
    public readonly byte[] Buffer;
    public readonly DateTime Timestamp;

    public Packet(DateTime timestamp, byte[] buffer, int size)
    {
        Timestamp = timestamp;
        Buffer = new byte[size];
        System.Buffer.BlockCopy(buffer, 0, Buffer, 0, size);
    }
}

My ReceiveCallback() function looks like this.

public static ManualResetEvent PacketReceived = new ManualResetEvent(false);
public static List<Packet> PacketList = new List<Packet>();
public static object SyncRoot = new object();
public static void ReceiveCallback(IAsyncResult ar)
{
    try {
        StateObject so = (StateObject)ar.AsyncState;
        int read = so.WorkSocket.EndReceive(ar);

        if (read > 0) {
            Packet packet = new Packet(DateTime.Now, so.Buffer, read);
            lock (SyncRoot) {
                PacketList.Add(packet);
            }
            PacketReceived.Set();
        }

        so.WorkSocket.BeginReceive(so.Buffer, 0, so.Buffer.Length, 0, ReceiveCallback, so);
    } catch (ObjectDisposedException) {
        // Handle the socket being closed with an async receive pending
    } catch (Exception e) {
        // Handle all other exceptions
    }
}

Notice that this implementation does absolutely no processing of the received data, nor does it have any expections as to how many bytes are supposed to have been received. It simply receives whatever data happens to be on the socket (up to 65535 bytes) and stores that data in the packet list, and then it immediately queues up another asynchronous receive.

Since processing no longer occurs in the thread that handles each asynchronous receive, the data will obviously be processed by a different thread, which is why the Add() operation is synchronized via the lock statement. In addition, the processing thread (whether it is the main thread or some other dedicated thread) needs to know when there is data to process. To do this, I usually use a ManualResetEvent, which is what I've shown above.

Here is how the processing works.

static void Main(string[] args)
{
    Thread t = new Thread(
        delegate() {
            List<Packet> packets;
            while (true) {
                PacketReceived.WaitOne();
                PacketReceived.Reset();
                lock (SyncRoot) {
                    packets = PacketList;
                    PacketList = new List<Packet>();
                }

                foreach (Packet packet in packets) {
                    // Process the packet
                }
            }
        }
    );
    t.IsBackground = true;
    t.Name = "Data Processing Thread";
    t.Start();
}

That's the basic infrastructure I use for all of my socket communication. It provides a nice separation between the receipt of the data and the processing of that data.

As to the other question you had, it is important to remember with this approach that each Packet instance does not necessarily represent a complete message within the context of your application. A Packet instance might contain a partial message, a single message, or multiple messages, and your messages might span multiple Packet instances. I've addressed how to know when you've received a full message in the related question you posted here.

Solution 3

You would read the length prefix first. Once you have that, you would just keep reading the bytes in blocks (and you can do this async, as you surmised) until you have exhausted the number of bytes you know are coming in off the wire.

Note that at some point, when reading the last block you won't want to read the full 1024 bytes, depending on what the length-prefix says the total is, and how many bytes you have read.

Solution 4

Also I troubled same problem.

When I tested several times, I found that sometimes multiple BeginReceive - EndReceive makes packet loss. (This loop was ended improperly)

In my case, I used two solution.

First, I defined the enough packet size to make only 1 time BeginReceive() ~ EndReceive();

Second, When I receive large size of data, I used NetworkStream.Read() instead of BeginReceive() - EndReceive().

Asynchronous socket is not easy to use, and it need a lot of understanding about socket.

Share:
51
Priyanka
Author by

Priyanka

Updated on July 05, 2022

Comments

  • Priyanka
    Priyanka almost 2 years

    I am unable to create unique partitions. when i am uploading data, it's creating all the dates as partition again and again, even the dates are same

    create table product_order1(id int,user_id int,amount int,product string, city string, txn_date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
    

    OK Time taken: 0.133 seconds

        LOAD DATA LOCAL INPATH 'txn' INTO TABLE product_order1;
        Loading data to table oct19.product_order1
        Table oct19.product_order1 stats: [numFiles=1, totalSize=303]
    OK
    

    Time taken: 0.426 seconds

        hive> 
        > set hive.exec.dynamic.partition = true;
        hive> 
        > set hive.exec.dynamic.partition.mode = true;
    
        hive> 
        > create table dyn_part(id int,user_id int,amount int,product string,city string) PARTITIONED BY(txn_date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
    OK
    

    Time taken: 0.14 seconds

        hive >
    INSERT OVERWRITE TABLE dyn_part PARTITION(txn_date) select id,user_id,amount,product,city,txn_date from product_order1;
    

    Result which i have received :-

        Loading data to table oct19.dyn_part partition (txn_date=null)
         Time taken for load dynamic partitions : 944
        Loading partition {txn_date=04-02-2015}
        Loading partition {txn_date= 03-04-2015}
        Loading partition {txn_date=01-02-2015}
        Loading partition {txn_date=03-04-2015}
        Loading partition {txn_date= 01-01-2015}
        Loading partition {txn_date=01-01-2015}
        Loading partition {txn_date= 01-02-2015}
         Time taken for adding to write entity : 5
    Partition oct19.dyn_part{txn_date= 01-01-2015} stats: [numFiles=1, numRows=1, totalSize=25, rawDataSize=24]
    Partition oct19.dyn_part{txn_date= 01-02-2015} stats: [numFiles=1, numRows=1, totalSize=25, rawDataSize=24]
    Partition oct19.dyn_part{txn_date= 03-04-2015} stats: [numFiles=1, numRows=2, totalSize=50, rawDataSize=48]
    Partition oct19.dyn_part{txn_date=01-01-2015} stats: [numFiles=1, numRows=1, totalSize=26, rawDataSize=25]
    Partition oct19.dyn_part{txn_date=01-02-2015} stats: [numFiles=1, numRows=1, totalSize=26, rawDataSize=25]
    Partition oct19.dyn_part{txn_date=03-04-2015} stats: [numFiles=1, numRows=1, totalSize=26, rawDataSize=25]
    Partition oct19.dyn_part{txn_date=04-02-2015} stats: [numFiles=1, numRows=1, totalSize=25, rawDataSize=24]
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1   Cumulative CPU: 4.03 sec   HDFS Read: 4166 HDFS Write: 614 SUCCESS
    Total MapReduce CPU Time Spent: 4 seconds 30 msec
    
  • ryeguy
    ryeguy about 15 years
    Are you really sure about the last sentence? Look at this example - msdn.microsoft.com/en-us/library/dxkwh6zw.aspx there is no protection for overreading. The reads just get joined together when the read returns 0 bytes. Buffer size is a const=1024.
  • casperOne
    casperOne about 15 years
    @ryeguy: I didn't word that right at all. Edited to be more clear.
  • Remus Rusanu
    Remus Rusanu about 14 years
    You are on the right track, but you also have to consider that your producer puts an entire buffer into the list for each receive completed. You have to consider two possible scenarios that will ruin you: 1) the slow WAN network trickle that will fill 10 bytes in each buffer and you'll exhaust your memory before the first frame is complete. and 2) the fast sender that overwhelms a slow receiver because the receiver accepts everything before its processed. For 1) you need to coalesce low filled buffers. For 2) you need TCP flow control, stop posting buffers when PacketList grows too big.
  • Remus Rusanu
    Remus Rusanu about 14 years
    Missed that your packet copies out only the filled buffer, so my objection 1) no longer applies (although now I would object about the copy...), but that still leaves 2)
  • sura
    sura about 13 years
    i tried this method, but in my case EndReceive never returns zero bytes. any reason for that?
  • Jon Skeet
    Jon Skeet about 13 years
    @sura: Perhaps the other end of the network connection never closed the connection?
  • Şafak Gür
    Şafak Gür almost 12 years
    So you've built Task.FromAsync before TPL was created. Gratz!
  • FaizanHussainRabbani
    FaizanHussainRabbani about 8 years
    @JonSkeet In case I have a persistent connection and partial messages are received, will EndReceive be able return 0 if partial message completes?
  • Jon Skeet
    Jon Skeet about 8 years
    @FaizanRabbani: What do you mean by "partial message"? EndReceive will only return 0 at the end of the stream, as far as I'm aware.
  • FaizanHussainRabbani
    FaizanHussainRabbani about 8 years
    @JonSkeet Partial message, if you consider A-Z alphabets are supposed to transmit, but due to some network problem A to M gets transmitted, then after a while lets say 100ms N to Z is transmitted.
  • Jon Skeet
    Jon Skeet about 8 years
    @FaizanRabbani: Right, then the first EndReceive will return 13 (assuming it doesn't delay to wait to receive more) and then if you call BeginReceive again, the next EndReceive will return 13 for the second part. Remember that streams are not message-oriented.
  • FaizanHussainRabbani
    FaizanHussainRabbani about 8 years
    @JonSkeet Thank you, it makes sense.
  • David Hoelzer
    David Hoelzer almost 5 years
    It doesn't matter where that comes from, it's naive. If you are implementing a TCP based protocol handler based on an RFC, you will rarely find that there is some kind of delimiter. Instead, you often must rely on intrinsic length and offset indicators within the application protocol itself.
  • Priyanka
    Priyanka over 4 years
    Thank you. I have used trim, it worked perfectly. Can you please explain the regexp_replace function with the parameters which you told me to use.
  • leftjoin
    leftjoin over 4 years
    @Priyanka explained. regexp_replace will convert date to the proper format yyyy-MM-dd. With your current date it is not possible to use predicates with > or <. like this: where txn_date <= some_date date should be in the yyyy-MM-dd format
  • leftjoin
    leftjoin over 4 years
    @Priyanka currently your dates are not sortable and comparable. For example 10-01-2018 is greater than 01-01-2019, this is wrong