Adding opencv processing to gstreamer application

21,368

Solution 1

So. After searching a while, I've found a solution, which involves retrieving data from buffer. So the idea is to create playbin and set appsink as "video-sink". Here is code sample:

cout<<"Creating appsink"<<endl;
appsink = gst_element_factory_make("appsink", "asink");
gst_app_sink_set_emit_signals((GstAppSink*)appsink, true);
gst_app_sink_set_drop((GstAppSink*)appsink, true);
gst_app_sink_set_max_buffers((GstAppSink*)appsink, 1);

//creating and initialising pipeline

g_object_set(G_OBJECT(playbin), "video-sink", appsink, NULL);

g_signal_connect(appsink, "new-buffer", G_CALLBACK(DisplayFrame), (gpointer) mark);

//callback function looks like this

gboolean Core::DisplayFrame(GstAppSink *fks, gpointer mark)
{
static bool init = false;
static IplImage *frame;
GstBuffer* buf;
Mark* mk = (Mark*) mark;

if(!init)
{
    init = true;
    frame = cvCreateImage(cvSize(mk->frame_w, mk->frame_h), IPL_DEPTH_8U, 1);
}
buf = gst_app_sink_pull_buffer(fks);
frame->imageData = (char*)GST_BUFFER_DATA(buf);

ProcessFrame(frame);
gst_buffer_unref(buf);
return true;
}

this works. PS. There's a lot of info about this method, but I spent a lot of time searching for it. So I decided to post it here in order to provide at least some keywords to search.

UPDATE. And a bit more information about connecting gstreamer and opencv. It's about converting buffer to iplimage now. First of all, we need to receive rgb buffer, to make conversion as easy, as possible. In order to do this we'll replace appsinks with appsink, connected to ffmpegcolorspace

cout<<"Creating appsink"<<endl;
appsink = gst_element_factory_make("appsink", "asink");
gst_app_sink_set_emit_signals((GstAppSink*)appsink, true);
gst_app_sink_set_drop((GstAppSink*)appsink, true);
gst_app_sink_set_max_buffers((GstAppSink*)appsink, 1);
csp = gst_element_factory_make("ffmpegcolorspace", "csp");
sinkpipe = gst_pipeline_new("sinkp");
gst_bin_add_many(GST_BIN(sinkpipe), csp, appsink, NULL);
gst_element_link_filtered(csp, appsink, gst_caps_new_simple("video/x-raw-rgb", NULL));
pad = gst_element_get_static_pad(csp, "sink");
gst_element_add_pad(sinkpipe, gst_ghost_pad_new("ghost", pad));
g_object_unref(pad);

//...

g_object_set(G_OBJECT(playbin), "video-sink", sinkpipe, NULL);

//...

g_signal_connect(appsink, "new-buffer", G_CALLBACK(GetFrame), (gpointer) mark);

//...

//caps_struct can be retrieved via writing data probe
//search for it in streamer manual

cout<<"Getting frame resolution"<<endl;
gst_structure_get_int(caps_struct, "width", &(mark->frame_w));
gst_structure_get_int(caps_struct, "height", &(mark->frame_h));
gst_structure_get_int(caps_struct, "depth", &depth);

mark->GeneratePoints();
frame = cvCreateImage(cvSize(mark->frame_w, mark->frame_h), depth/3, 3);


//callback function

gboolean Core::GetFrame(GstAppSink *fks, gpointer frame)
{

IplImage* frame_temp = frame
IplImage* frame_temp_two = cvCloneImage(frame_temp);

GstBuffer* buf;
buf = gst_app_sink_pull_buffer(fks);
frame_temp_two->imageData = (char*) GST_BUFFER_DATA(buf);
cvConvertImage(frame_temp_two, frame_temp, CV_CVTIMG_SWAP_RB);
ProcessFrame(frame_temp);
gst_buffer_unref(buf);
return true;
}

I hope this will help somebody.

Solution 2

Here is my complete source code solution for Gstreamer 1.4.0 and OpenCV 2.4.9.

It uses gst_parse_launch() to parse normal command line that you would give to gst-launch. Gstreamer pipeline converts frames to RGB888 format before feeding them to OpenCV so that conversion is as easy as possible.

OpenCV frame processing is not done in new_sample() callback but it only grabs the frame from gstreamer and push it to queue which then will be consumed in main thread. This way we mat call e.g. imshow() from OpenCV to actually render image to screen.

~150 lines... by removing debug prints etc. could be reduced to <100 lines of code.

Probably one should add there thread synchronization around deque read / write

#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <stdlib.h>

#include "opencv2/opencv.hpp"
using namespace cv;

// TODO: use synchronized deque
std::deque<Mat> frameQueue;

GstFlowReturn
new_preroll(GstAppSink *appsink, gpointer data) {
  g_print ("Got preroll!\n");
  return GST_FLOW_OK;
}

GstFlowReturn
new_sample(GstAppSink *appsink, gpointer data) {
  static int framecount = 0;
  framecount++;

  GstSample *sample = gst_app_sink_pull_sample(appsink);
  GstCaps *caps = gst_sample_get_caps(sample);
  GstBuffer *buffer = gst_sample_get_buffer(sample);
  const GstStructure *info = gst_sample_get_info(sample);

  // ---- Read frame and convert to opencv format ---------------

  GstMapInfo map;
  gst_buffer_map (buffer, &map, GST_MAP_READ);

  // convert gstreamer data to OpenCV Mat, you could actually
  // resolve height / width from caps...
  Mat frame(Size(320, 240), CV_8UC3, (char*)map.data, Mat::AUTO_STEP);
  int frameSize = map.size;

  // TODO: synchronize this....
  frameQueue.push_back(frame);

  gst_buffer_unmap(buffer, &map);

  // ------------------------------------------------------------

  // print dot every 30 frames
  if (framecount%30 == 0) {
    g_print (".");
  }

  // show caps on first frame
  if (framecount == 1) {
    g_print ("%s\n", gst_caps_to_string(caps));
  }

  gst_sample_unref (sample);
  return GST_FLOW_OK;
}

static gboolean
my_bus_callback (GstBus *bus, GstMessage *message, gpointer data) {
  g_print ("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));
  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_ERROR: {
      GError *err;
      gchar *debug;

      gst_message_parse_error (message, &err, &debug);
      g_print ("Error: %s\n", err->message);
      g_error_free (err);
      g_free (debug);    
      break;
    }
    case GST_MESSAGE_EOS:
      /* end-of-stream */
      break;
    default:
      /* unhandled message */
      break;
  }
  /* we want to be notified again the next time there is a message
   * on the bus, so returning TRUE (FALSE means we want to stop watching
   * for messages on the bus and our callback should not be called again)
   */
  return TRUE;
}

int
main (int argc, char *argv[])
{
  GError *error = NULL;

  gst_init (&argc, &argv);

  gchar *descr = g_strdup(
    "videotestsrc  pattern=ball ! "
    "video/x-raw,format=RGB ! "
    "videoconvert ! "
    "appsink name=sink sync=true"
  );
  GstElement *pipeline = gst_parse_launch (descr, &error);

  if (error != NULL) {
    g_print ("could not construct pipeline: %s\n", error->message);
    g_error_free (error);
    exit (-1);
  }

  /* get sink */
  GstElement *sink = gst_bin_get_by_name (GST_BIN (pipeline), "sink");

  gst_app_sink_set_emit_signals((GstAppSink*)sink, true);
  gst_app_sink_set_drop((GstAppSink*)sink, true);
  gst_app_sink_set_max_buffers((GstAppSink*)sink, 1);
  GstAppSinkCallbacks callbacks = { NULL, new_preroll, new_sample };
  gst_app_sink_set_callbacks (GST_APP_SINK(sink), &callbacks, NULL, NULL);

  GstBus *bus;
  guint bus_watch_id;
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  bus_watch_id = gst_bus_add_watch (bus, my_bus_callback, NULL);
  gst_object_unref (bus);

  gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);

  namedWindow("edges",1);
  while(1) {
    g_main_iteration(false);

      // TODO: synchronize...
    if (frameQueue.size() > 0) {
      // this lags pretty badly even when grabbing frames from webcam
      Mat frame = frameQueue.front();
      Mat edges;
      cvtColor(frame, edges, CV_RGB2GRAY);
      GaussianBlur(edges, edges, Size(7,7), 1.5, 1.5);
      Canny(edges, edges, 0, 30, 3);
      imshow("edges", edges);
      cv::waitKey(30);
      frameQueue.clear();
    }
  }  

  gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
  gst_object_unref (GST_OBJECT (pipeline));

  return 0;  
}

To compile on OSX / Linux make file like this should do:

GST_FLAGS=$(shell pkg-config --cflags --libs gstreamer-gl-1.0 gstreamer-tag-1.0 gstreamer-net-1.0 gstreamer-sdp-1.0 \
  gstreamer-1.0 gstreamer-allocators-1.0 gstreamer-insertbin-1.0 gstreamer-plugins-base-1.0 \
  gstreamer-codecparsers-1.0 gstreamer-base-1.0 gstreamer-app-1.0 gstreamer-check-1.0 \
  gstreamer-controller-1.0 gstreamer-video-1.0 gstreamer-fft-1.0 gstreamer-mpegts-1.0 \
  gstreamer-pbutils-1.0 gstreamer-rtp-1.0 gstreamer-rtsp-1.0 \
  gstreamer-riff-1.0 gstreamer-audio-1.0 gstreamer-plugins-bad-1.0 opencv)

OPENCV_FLAGS=$(shell pkg-config --cflags --libs opencv)

all: gst_opencv

gst_opencv: gst_opencv
    g++ $(GST_FLAGS) $(OPENCV_FLAGS) gst_opencv -o gst_opencv

clean:
    rm -f gst_opencv

Solution 3

I can't comment Mikael post (no enough points for that). I did some modifications and corrections in the program, you can check it here on in my gist. https://gist.github.com/patrickelectric/5dca1cb7cef4ffa7fbb6fb70dd9f9edc

/**
* Based on:
* https://stackoverflow.com/questions/10403588/adding-opencv-processing-to-gstreamer-application
*/

// Include atomic std library
#include <atomic>

// Include gstreamer library
#include <gst/gst.h>
#include <gst/app/app.h>

// Include OpenCV library
#include <opencv.hpp>

// Share frame between main loop and gstreamer callback
std::atomic<cv::Mat*> atomicFrame;

/**
* @brief Check preroll to get a new frame using callback
*  https://gstreamer.freedesktop.org/documentation/design/preroll.html
* @return GstFlowReturn
*/
GstFlowReturn new_preroll(GstAppSink* /*appsink*/, gpointer /*data*/)
{
    return GST_FLOW_OK;
}

/**
* @brief This is a callback that get a new frame when a preroll exist
*
* @param appsink
* @return GstFlowReturn
*/
GstFlowReturn new_sample(GstAppSink *appsink, gpointer /*data*/)
{
    static int framecount = 0;

    // Get caps and frame
    GstSample *sample = gst_app_sink_pull_sample(appsink);
    GstCaps *caps = gst_sample_get_caps(sample);
    GstBuffer *buffer = gst_sample_get_buffer(sample);
    GstStructure *structure = gst_caps_get_structure(caps, 0);
    const int width = g_value_get_int(gst_structure_get_value(structure, "width"));
    const int height = g_value_get_int(gst_structure_get_value(structure, "height"));

    // Print dot every 30 frames
    if(!(framecount%30)) {
        g_print(".");
    }

    // Show caps on first frame
    if(!framecount) {
        g_print("caps: %s\n", gst_caps_to_string(caps));
    }
    framecount++;

    // Get frame data
    GstMapInfo map;
    gst_buffer_map(buffer, &map, GST_MAP_READ);

    // Convert gstreamer data to OpenCV Mat
    cv::Mat* prevFrame;
    prevFrame = atomicFrame.exchange(new cv::Mat(cv::Size(width, height), CV_8UC3, (char*)map.data, cv::Mat::AUTO_STEP));
    if(prevFrame) {
        delete prevFrame;
    }

    gst_buffer_unmap(buffer, &map);
    gst_sample_unref(sample);

    return GST_FLOW_OK;
}

/**
* @brief Bus callback
*  Print important messages
*
* @param bus
* @param message
* @param data
* @return gboolean
*/
static gboolean my_bus_callback(GstBus *bus, GstMessage *message, gpointer data)
{
    // Debug message
    //g_print("Got %s message\n", GST_MESSAGE_TYPE_NAME(message));
    switch(GST_MESSAGE_TYPE(message)) {
        case GST_MESSAGE_ERROR: {
            GError *err;
            gchar *debug;

            gst_message_parse_error(message, &err, &debug);
            g_print("Error: %s\n", err->message);
            g_error_free(err);
            g_free(debug);
            break;
        }
        case GST_MESSAGE_EOS:
            /* end-of-stream */
            break;
        default:
            /* unhandled message */
            break;
    }
    /* we want to be notified again the next time there is a message
    * on the bus, so returning TRUE (FALSE means we want to stop watching
    * for messages on the bus and our callback should not be called again)
    */
    return true;
}

int main(int argc, char *argv[]) {
    gst_init(&argc, &argv);

    gchar *descr = g_strdup(
        "udpsrc port=5600 "
        "! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264 "
        "! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert "
        "! appsink name=sink emit-signals=true sync=false max-buffers=1 drop=true"
    );

    // Check pipeline
    GError *error = nullptr;
    GstElement *pipeline = gst_parse_launch(descr, &error);

    if(error) {
        g_print("could not construct pipeline: %s\n", error->message);
        g_error_free(error);
        exit(-1);
    }

    // Get sink
    GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "sink");

    /**
    * @brief Get sink signals and check for a preroll
    *  If preroll exists, we do have a new frame
    */
    gst_app_sink_set_emit_signals((GstAppSink*)sink, true);
    gst_app_sink_set_drop((GstAppSink*)sink, true);
    gst_app_sink_set_max_buffers((GstAppSink*)sink, 1);
    GstAppSinkCallbacks callbacks = { nullptr, new_preroll, new_sample };
    gst_app_sink_set_callbacks(GST_APP_SINK(sink), &callbacks, nullptr, nullptr);

    // Declare bus
    GstBus *bus;
    bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
    gst_bus_add_watch(bus, my_bus_callback, nullptr);
    gst_object_unref(bus);

    gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);

    // Main loop
    while(1) {
        g_main_iteration(false);

        cv::Mat* frame = atomicFrame.load();
        if(frame) {
            cv::imshow("Frame", atomicFrame.load()[0]);
            cv::waitKey(30);
        }
    }

    gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL);
    gst_object_unref(GST_OBJECT(pipeline));
    return 0;
}
Share:
21,368
Roman
Author by

Roman

Updated on June 08, 2020

Comments

  • Roman
    Roman almost 4 years

    I'm trying to do the following: receive video stream using gstreamer and process it with opencv. I've found few solutions, and one of them is to write video into (from gstreamer) fifo and then read it using opencv. (OPTION3 here MJPEG streaming and decoding). The problem is I cant open pipe. cvCreateFileCapture just never returns. Here is a part code I wrote:

    if(mkfifo("fifo.avi", S_IRUSR| S_IWUSR) == -1)
    {
        cout<<"Cant create fifo"<<endl;
        cout<<errno<<endl;
    }
    
    loop = g_main_loop_new(NULL, false);
    
    fsink = gst_element_factory_make("filesink", "fsink");
    g_object_set(G_OBJECT(fsink), "location", "fifo.avi", NULL);
    
    playbin = gst_element_factory_make("playbin2", "play");    
    g_object_set(G_OBJECT(playbin), "uri", uri.c_str(), NULL);
    g_object_set(G_OBJECT(playbin), "video-sink", fsink, NULL);
    
    bus = gst_pipeline_get_bus(GST_PIPELINE(playbin));
    gst_bus_add_signal_watch(bus);
    
    g_signal_connect(bus, "message::buffering", G_CALLBACK(&set_playbin_state), playbin);
    gst_object_unref(bus);
    
    cvNamedWindow("output", CV_WINDOW_AUTOSIZE);
    capture = cvCreateFileCapture("fifo.avi");
    

    The program stacks in the last line. PS: I'm using opencv 2.3.1.

  • Tim Diekmann
    Tim Diekmann almost 6 years
    In the future, please include all relevant code in your post and don't just include a link to a code hosting site. Your post should stand alone from any other resource; consider what would happen if that site went down in the future!
  • p.deman
    p.deman almost 4 years
    I am trying your code, but when I run it I get "no element "udpsrc". I tried adding gstudp.dll in the same folder as my .exe but i get the same. for some reason it doesn't link with it
  • p.deman
    p.deman almost 4 years
    when I try your solution, despite the gst_app_sink_set_drop((GstAppSink*)appsink, true); gst_app_sink_set_max_buffers((GstAppSink*)appsink, 1); it does'nt drop the unprocessed frames. it has a queue. I'll like it to drop all frames unprocessed. how can I do that ?