gst-convenience: Change GstDiscoverer namespace
[gupnp:gupnp-dlna.git] / gst-convenience / gst-libs / gst / discoverer / gstdiscoverer.c
1 /* GStreamer
2  * Copyright (C) 2009 Edward Hervey <edward.hervey@collabora.co.uk>
3  *               2009 Nokia Corporation
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:gstdiscoverer
23  * @short_description: Utility for discovering information on URIs.
24  *
25  * The #GstDiscovererInternal is a utility object which allows to get as much
26  * information as possible from one or many URIs.
27  *
28  * It provides two APIs, allowing usage in blocking or non-blocking mode.
29  *
30  * The blocking mode just requires calling @gst_discoverer_discover_uri
31  * with the URI one wishes to discover.
32  *
33  * The non-blocking mode requires a running #GMainLoop in the default
34  * #GMainContext, where one connects to the various signals, appends the
35  * URIs to be processed (through @gst_discoverer_append_uri) and then
36  * asks for the discovery to begin (through @gst_discoverer_start).
37  *
38  * The information #GstStructure contains the fllowing information:
39  * <variablelist>
40  *  <varlistentry>
41  *    <term>'duration'</term>
42  *    <listitem>The duration in nanoseconds. May not be present in case of
43  *    errors.</listitem>
44  *  </varlistentry>
45  * </variablelist>
46  *
47  * Since 0.10.26
48  */
49 #ifdef HAVE_CONFIG_H
50 #include "config.h"
51 #endif
52
53
54 #include "gstdiscoverer.h"
55 #include "gstdiscoverer-marshal.h"
56
57 GST_DEBUG_CATEGORY_STATIC (discoverer_debug);
58 #define GST_CAT_DEFAULT discoverer_debug
59
60 static GQuark _INFORMATION_QUARK;
61 static GQuark _CAPS_QUARK;
62 static GQuark _TAGS_QUARK;
63 static GQuark _MISSING_PLUGIN_QUARK;
64 static GQuark _STREAM_TOPOLOGY_QUARK;
65 static GQuark _TOPOLOGY_PAD_QUARK;
66
67 typedef struct
68 {
69   GstDiscovererInternal *dc;
70   GstPad *pad;
71   GstElement *queue;
72   GstElement *sink;
73   GstTagList *tags;
74 } PrivateStream;
75
76 #define DISCO_LOCK(dc) g_mutex_lock (dc->lock);
77 #define DISCO_UNLOCK(dc) g_mutex_unlock (dc->lock);
78
79 static void
80 _do_init (void)
81 {
82   GST_DEBUG_CATEGORY_INIT (discoverer_debug, "discoverer", 0, "DiscovererInternal");
83
84   _INFORMATION_QUARK = g_quark_from_string ("information");
85   _CAPS_QUARK = g_quark_from_string ("caps");
86   _TAGS_QUARK = g_quark_from_string ("tags");
87   _MISSING_PLUGIN_QUARK = g_quark_from_string ("missing-plugin");
88   _STREAM_TOPOLOGY_QUARK = g_quark_from_string ("stream-topology");
89   _TOPOLOGY_PAD_QUARK = g_quark_from_string ("pad");
90 };
91
92 G_DEFINE_TYPE_EXTENDED (GstDiscovererInternal, gst_discoverer_internal, G_TYPE_OBJECT, 0,
93     _do_init ());
94
95 enum
96 {
97   SIGNAL_READY,
98   SIGNAL_STARTING,
99   SIGNAL_DISCOVERED,
100   LAST_SIGNAL
101 };
102
103 #define DEFAULT_PROP_TIMEOUT 15 * GST_SECOND
104
105 enum
106 {
107   PROP_0,
108   PROP_TIMEOUT
109 };
110
111 static guint gst_discoverer_signals[LAST_SIGNAL] = { 0 };
112
113 static void gst_discoverer_set_timeout (GstDiscovererInternal * dc,
114     GstClockTime timeout);
115
116 static void discoverer_bus_cb (GstBus * bus, GstMessage * msg,
117     GstDiscovererInternal * dc);
118 static void uridecodebin_pad_added_cb (GstElement * uridecodebin, GstPad * pad,
119     GstDiscovererInternal * dc);
120 static void uridecodebin_pad_removed_cb (GstElement * uridecodebin,
121     GstPad * pad, GstDiscovererInternal * dc);
122
123 static void gst_discoverer_dispose (GObject * dc);
124 static void gst_discoverer_set_property (GObject * object, guint prop_id,
125     const GValue * value, GParamSpec * pspec);
126 static void gst_discoverer_get_property (GObject * object, guint prop_id,
127     GValue * value, GParamSpec * pspec);
128
129 static void
130 gst_discoverer_internal_class_init (GstDiscovererInternalClass * klass)
131 {
132   GObjectClass *gobject_class = (GObjectClass *) klass;
133
134   gobject_class->dispose = gst_discoverer_dispose;
135
136   gobject_class->set_property = gst_discoverer_set_property;
137   gobject_class->get_property = gst_discoverer_get_property;
138
139   /* properties */
140   /**
141    * GstDiscovererInternal:timeout
142    *
143    * The duration (in nanoseconds) after which the discovery of an individual
144    * URI will timeout.
145    *
146    * If the discovery of a URI times out, the @GST_DISCOVERER_TIMEOUT will be
147    * set on the result flags.
148    */
149   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
150       g_param_spec_uint64 ("timeout", "timeout", "Timeout",
151           GST_SECOND, 3600 * GST_SECOND, DEFAULT_PROP_TIMEOUT,
152           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
153
154   /* signals */
155   /**
156    * GstDiscovererInternal::ready:
157    * @discoverer: the #GstDiscovererInternal
158    *
159    * Will be emitted when all pending URIs have been processed.
160    */
161   gst_discoverer_signals[SIGNAL_READY] =
162       g_signal_new ("ready", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
163       G_STRUCT_OFFSET (GstDiscovererInternalClass, ready),
164       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
165
166   /**
167    * GstDiscovererInternal::starting:
168    * @discoverer: the #GstDiscovererInternal
169    *
170    * Will be emitted when the discover starts analyzing the pending URIs
171    */
172   gst_discoverer_signals[SIGNAL_STARTING] =
173       g_signal_new ("starting", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
174       G_STRUCT_OFFSET (GstDiscovererInternalClass, starting),
175       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
176
177   /**
178    * GstDiscovererInternal::discovered:
179    * @discoverer: the #GstDiscovererInternal
180    * @info: the results #GstDiscovererInformation
181    * @error: (type GLib.Error): #GError, which will be non-NULL if an error
182    *                            occured during discovery
183    *
184    * Will be emitted when all information on a URI could be discovered.
185    */
186   gst_discoverer_signals[SIGNAL_DISCOVERED] =
187       g_signal_new ("discovered", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
188       G_STRUCT_OFFSET (GstDiscovererInternalClass, discovered),
189       NULL, NULL, __gst_discoverer_marshal_VOID__BOXED_BOXED,
190       G_TYPE_NONE, 2, GST_TYPE_DISCOVERER_INFORMATION, GST_TYPE_G_ERROR);
191 }
192
193 #if (GST_CHECK_VERSION(0,10,26) || (GST_VERSION_MICRO == 25 && GST_VERSION_NANO >= 1))
194 static void
195 uridecodebin_element_added_cb (GstElement * uridecodebin,
196     GstElement * child, GstDiscovererInternal * dc)
197 {
198   GST_DEBUG ("New element added to uridecodebin : %s",
199       GST_ELEMENT_NAME (child));
200
201   if (G_OBJECT_TYPE (child) == dc->decodebin2_type) {
202     g_object_set (child, "post-stream-topology", TRUE, NULL);
203   }
204 }
205 #endif
206
207 static void
208 gst_discoverer_internal_init (GstDiscovererInternal * dc)
209 {
210 #if (GST_CHECK_VERSION(0,10,26) || (GST_VERSION_MICRO == 25 && GST_VERSION_NANO >= 1))
211   GstElement *tmp;
212 #endif
213
214   dc->timeout = DEFAULT_PROP_TIMEOUT;
215   dc->async = FALSE;
216
217   dc->lock = g_mutex_new ();
218
219   GST_LOG ("Creating pipeline");
220   dc->pipeline = (GstBin *) gst_pipeline_new ("DiscovererInternal");
221   GST_LOG_OBJECT (dc, "Creating uridecodebin");
222   dc->uridecodebin =
223       gst_element_factory_make ("uridecodebin", "discoverer-uri");
224   GST_LOG_OBJECT (dc, "Adding uridecodebin to pipeline");
225   gst_bin_add (dc->pipeline, dc->uridecodebin);
226
227   g_signal_connect (dc->uridecodebin, "pad-added",
228       G_CALLBACK (uridecodebin_pad_added_cb), dc);
229   g_signal_connect (dc->uridecodebin, "pad-removed",
230       G_CALLBACK (uridecodebin_pad_removed_cb), dc);
231
232   GST_LOG_OBJECT (dc, "Getting pipeline bus");
233   dc->bus = gst_pipeline_get_bus ((GstPipeline *) dc->pipeline);
234
235   g_signal_connect (dc->bus, "message", G_CALLBACK (discoverer_bus_cb), dc);
236
237   GST_DEBUG_OBJECT (dc, "Done initializing DiscovererInternal");
238
239 #if (GST_CHECK_VERSION(0,10,26) || (GST_VERSION_MICRO == 25 && GST_VERSION_NANO >= 1))
240   /* This is ugly. We get the GType of decodebin2 so we can quickly detect
241    * when a decodebin2 is added to uridecodebin so we can set the
242    * post-stream-topology setting to TRUE */
243   g_signal_connect (dc->uridecodebin, "element-added",
244       G_CALLBACK (uridecodebin_element_added_cb), dc);
245   tmp = gst_element_factory_make ("decodebin2", NULL);
246   dc->decodebin2_type = G_OBJECT_TYPE (tmp);
247   g_object_unref (tmp);
248 #endif
249 }
250
251 static void
252 discoverer_reset (GstDiscovererInternal * dc)
253 {
254   GST_DEBUG_OBJECT (dc, "Resetting");
255
256   if (dc->pending_uris) {
257     g_list_foreach (dc->pending_uris, (GFunc) g_free, NULL);
258     g_list_free (dc->pending_uris);
259     dc->pending_uris = NULL;
260   }
261
262   gst_element_set_state ((GstElement *) dc->pipeline, GST_STATE_NULL);
263 }
264
265 static void
266 gst_discoverer_dispose (GObject * obj)
267 {
268   GstDiscovererInternal *dc = (GstDiscovererInternal *) obj;
269
270   GST_DEBUG_OBJECT (dc, "Disposing");
271
272   discoverer_reset (dc);
273
274   if (G_LIKELY (dc->pipeline)) {
275     /* pipeline was set to NULL in _reset */
276     g_object_unref (dc->pipeline);
277     g_object_unref (dc->bus);
278     dc->pipeline = NULL;
279     dc->uridecodebin = NULL;
280     dc->bus = NULL;
281   }
282
283   if (dc->lock) {
284     g_mutex_free (dc->lock);
285     dc->lock = NULL;
286   }
287 }
288
289 static void
290 gst_discoverer_set_property (GObject * object, guint prop_id,
291     const GValue * value, GParamSpec * pspec)
292 {
293   GstDiscovererInternal *dc = (GstDiscovererInternal *) object;
294
295   switch (prop_id) {
296     case PROP_TIMEOUT:
297       gst_discoverer_set_timeout (dc, g_value_get_uint64 (value));
298       break;
299     default:
300       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
301       break;
302   }
303 }
304
305 static void
306 gst_discoverer_get_property (GObject * object, guint prop_id,
307     GValue * value, GParamSpec * pspec)
308 {
309   GstDiscovererInternal *dc = (GstDiscovererInternal *) object;
310
311   switch (prop_id) {
312     case PROP_TIMEOUT:
313       g_value_set_uint64 (value, dc->timeout);
314       break;
315     default:
316       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
317       break;
318   }
319 }
320
321 static void
322 gst_discoverer_set_timeout (GstDiscovererInternal * dc, GstClockTime timeout)
323 {
324   GST_DEBUG_OBJECT (dc, "timeout : %" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
325
326   /* FIXME : update current pending timeout if we're running */
327   DISCO_LOCK (dc);
328   dc->timeout = timeout;
329   DISCO_UNLOCK (dc);
330 }
331
332 static gboolean
333 _event_probe (GstPad * pad, GstEvent * event, PrivateStream * ps)
334 {
335   if (GST_EVENT_TYPE (event) == GST_EVENT_TAG) {
336     GstTagList *tl = NULL;
337
338     gst_event_parse_tag (event, &tl);
339     GST_DEBUG_OBJECT (pad, "tags %" GST_PTR_FORMAT, tl);
340     DISCO_LOCK (ps->dc);
341     ps->tags = gst_tag_list_merge (ps->tags, tl, GST_TAG_MERGE_APPEND);
342     DISCO_UNLOCK (ps->dc);
343   }
344
345   return TRUE;
346 }
347
348 static void
349 uridecodebin_pad_added_cb (GstElement * uridecodebin, GstPad * pad,
350     GstDiscovererInternal * dc)
351 {
352   PrivateStream *ps;
353   GstPad *sinkpad = NULL;
354
355   GST_DEBUG_OBJECT (dc, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
356
357   ps = g_slice_new0 (PrivateStream);
358
359   ps->dc = dc;
360   ps->pad = pad;
361   ps->queue = gst_element_factory_make ("queue", NULL);
362   ps->sink = gst_element_factory_make ("fakesink", NULL);
363   g_object_set (ps->sink, "silent", TRUE, NULL);
364
365   if (G_UNLIKELY (ps->queue == NULL || ps->sink == NULL))
366     goto error;
367
368   g_object_set (ps->queue, "max-size-buffers", 1, NULL);
369
370   gst_bin_add_many (dc->pipeline, ps->queue, ps->sink, NULL);
371
372   if (!gst_element_link (ps->queue, ps->sink))
373     goto error;
374   if (!gst_element_sync_state_with_parent (ps->sink))
375     goto error;
376   if (!gst_element_sync_state_with_parent (ps->queue))
377     goto error;
378
379   sinkpad = gst_element_get_static_pad (ps->queue, "sink");
380   if (sinkpad == NULL)
381     goto error;
382   if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK)
383     goto error;
384   g_object_unref (sinkpad);
385
386   /* Add an event probe */
387   gst_pad_add_event_probe (pad, G_CALLBACK (_event_probe), ps);
388
389   DISCO_LOCK (dc);
390   dc->streams = g_list_append (dc->streams, ps);
391   DISCO_UNLOCK (dc);
392
393   GST_DEBUG_OBJECT (dc, "Done handling pad");
394
395   return;
396
397 error:
398   GST_ERROR_OBJECT (dc, "Error while handling pad");
399   if (sinkpad)
400     g_object_unref (sinkpad);
401   if (ps->queue)
402     g_object_unref (ps->queue);
403   if (ps->sink)
404     g_object_unref (ps->sink);
405   g_free (ps);
406   return;
407 }
408
409 static void
410 uridecodebin_pad_removed_cb (GstElement * uridecodebin, GstPad * pad,
411     GstDiscovererInternal * dc)
412 {
413   GList *tmp;
414   PrivateStream *ps;
415   GstPad *sinkpad;
416
417   GST_DEBUG_OBJECT (dc, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
418
419   /* Find the PrivateStream */
420   DISCO_LOCK (dc);
421   for (tmp = dc->streams; tmp; tmp = tmp->next) {
422     ps = (PrivateStream *) tmp->data;
423     if (ps->pad == pad)
424       break;
425   }
426
427   if (tmp == NULL) {
428     DISCO_UNLOCK (dc);
429     GST_DEBUG ("The removed pad wasn't controlled by us !");
430     return;
431   }
432
433   dc->streams = g_list_delete_link (dc->streams, tmp);
434   DISCO_UNLOCK (dc);
435
436   gst_element_set_state (ps->sink, GST_STATE_NULL);
437   gst_element_set_state (ps->queue, GST_STATE_NULL);
438   gst_element_unlink (ps->queue, ps->sink);
439
440   sinkpad = gst_element_get_static_pad (ps->queue, "sink");
441   gst_pad_unlink (pad, sinkpad);
442   g_object_unref (sinkpad);
443
444   /* references removed here */
445   gst_bin_remove_many (dc->pipeline, ps->sink, ps->queue, NULL);
446
447   if (ps->tags) {
448     gst_tag_list_free (ps->tags);
449   }
450
451   g_slice_free1 (sizeof (PrivateStream), ps);
452
453   GST_DEBUG ("Done handling pad");
454 }
455
456 static GstStructure *
457 collect_stream_information (GstDiscovererInternal * dc, PrivateStream * ps, guint idx)
458 {
459   GstCaps *caps;
460   GstStructure *st;
461   gchar *stname;
462
463   stname = g_strdup_printf ("stream-%02d", idx);
464   st = gst_structure_empty_new (stname);
465   g_free (stname);
466
467   /* Get caps */
468   caps = gst_pad_get_negotiated_caps (ps->pad);
469   if (caps) {
470     GST_DEBUG ("Got caps %" GST_PTR_FORMAT, caps);
471     gst_structure_id_set (st, _CAPS_QUARK, GST_TYPE_CAPS, caps, NULL);
472
473     gst_caps_unref (caps);
474   } else
475     GST_WARNING ("Couldn't get negotiated caps from %s:%s",
476         GST_DEBUG_PAD_NAME (ps->pad));
477   if (ps->tags)
478     gst_structure_id_set (st, _TAGS_QUARK, GST_TYPE_STRUCTURE, ps->tags, NULL);
479
480   return st;
481 }
482
483 /* Parses a set of caps and tags in st and populates a GstStreamInformation
484  * structure (parent, if !NULL, otherwise it allocates one)
485  */
486 static GstStreamInformation *
487 collect_information (GstDiscovererInternal * dc, const GstStructure * st,
488     GstStreamInformation * parent)
489 {
490   GstCaps *caps;
491   GstStructure *caps_st, *tags_st;
492   const gchar *name;
493   int tmp, tmp2;
494   guint utmp;
495   gboolean btmp;
496
497   if (!st || !gst_structure_id_has_field (st, _CAPS_QUARK)) {
498     GST_WARNING ("Couldn't find caps !");
499     if (parent)
500       return parent;
501     else
502       return gst_stream_information_new ();
503   }
504
505   gst_structure_id_get ((GstStructure *) st, _CAPS_QUARK, GST_TYPE_CAPS, &caps,
506       NULL);
507   caps_st = gst_caps_get_structure (caps, 0);
508   name = gst_structure_get_name (caps_st);
509
510   if (g_str_has_prefix (name, "audio/")) {
511     GstStreamAudioInformation *info;
512
513     if (parent)
514       info = (GstStreamAudioInformation *) parent;
515     else {
516       info = gst_stream_audio_information_new ();
517       info->parent.caps = caps;
518     }
519
520     if (gst_structure_get_int (caps_st, "rate", &tmp))
521       info->sample_rate = (guint) tmp;
522
523     if (gst_structure_get_int (caps_st, "channels", &tmp))
524       info->channels = (guint) tmp;
525
526     if (gst_structure_get_int (caps_st, "depth", &tmp))
527       info->depth = (guint) tmp;
528
529     if (gst_structure_id_has_field (st, _TAGS_QUARK)) {
530       gst_structure_id_get ((GstStructure *) st, _TAGS_QUARK,
531           GST_TYPE_STRUCTURE, &tags_st, NULL);
532       if (gst_structure_get_uint (tags_st, GST_TAG_BITRATE, &utmp))
533         info->bitrate = utmp;
534
535       if (gst_structure_get_uint (tags_st, GST_TAG_MAXIMUM_BITRATE, &utmp))
536         info->max_bitrate = utmp;
537
538 #ifdef GST_TAG_HAS_VBR
539       if (gst_structure_get_boolean (tags_st, GST_TAG_HAS_VBR, &btmp))
540         info->is_vbr = btmp;
541 #endif
542
543       /* FIXME: Is it worth it to remove the tags we've parsed? */
544       info->parent.tags = gst_tag_list_merge (info->parent.tags,
545           (GstTagList *) tags_st, GST_TAG_MERGE_REPLACE);
546
547       gst_structure_free (tags_st);
548     }
549
550     return (GstStreamInformation *) info;
551
552   } else if (g_str_has_prefix (name, "video/") ||
553       g_str_has_prefix (name, "image/")) {
554     GstStreamVideoInformation *info;
555     GstVideoFormat format;
556
557     if (parent)
558       info = (GstStreamVideoInformation *) parent;
559     else {
560       info = gst_stream_video_information_new ();
561       info->parent.caps = caps;
562     }
563
564     if (gst_video_format_parse_caps (caps, &format, &tmp, &tmp2)) {
565       info->width = (guint) tmp;
566       info->height = (guint) tmp2;
567       info->format = format;
568     }
569
570     if (gst_structure_get_int (caps_st, "depth", &tmp))
571       info->depth = (guint) tmp;
572
573     if (gst_video_parse_caps_pixel_aspect_ratio (caps, &tmp, &tmp2))
574       gst_value_set_fraction (&info->pixel_aspect_ratio, tmp, tmp2);
575
576     if (gst_video_parse_caps_framerate (caps, &tmp, &tmp2))
577       gst_value_set_fraction (&info->frame_rate, tmp, tmp2);
578
579     if (gst_video_format_parse_caps_interlaced (caps, &btmp))
580       info->interlaced = btmp;
581
582     if (gst_structure_id_has_field (st, _TAGS_QUARK)) {
583       gst_structure_id_get ((GstStructure *) st, _TAGS_QUARK,
584           GST_TYPE_STRUCTURE, &tags_st, NULL);
585       /* FIXME: Is it worth it to remove the tags we've parsed? */
586       info->parent.tags = gst_tag_list_merge (info->parent.tags,
587           (GstTagList *) tags_st, GST_TAG_MERGE_REPLACE);
588       gst_structure_free (tags_st);
589     }
590
591     return (GstStreamInformation *) info;
592
593   } else {
594     /* None of the above - populate what information we can */
595     GstStreamInformation *info;
596
597     if (parent)
598       info = parent;
599     else {
600       info = gst_stream_information_new ();
601       info->caps = caps;
602     }
603
604     if (gst_structure_id_get ((GstStructure *) st, _TAGS_QUARK,
605             GST_TYPE_STRUCTURE, &tags_st, NULL)) {
606       info->tags = gst_tag_list_merge (info->tags, (GstTagList *) tags_st,
607           GST_TAG_MERGE_REPLACE);
608       gst_structure_free (tags_st);
609     }
610
611     return info;
612   }
613
614 }
615
616 static GstStructure *
617 find_stream_for_node (GstDiscovererInternal * dc, const GstStructure * topology)
618 {
619   GstPad *pad;
620   GstPad *target_pad = NULL;
621   GstStructure *st = NULL;
622   PrivateStream *ps;
623   guint i;
624   GList *tmp;
625
626   if (!gst_structure_id_has_field (topology, _TOPOLOGY_PAD_QUARK)) {
627     GST_DEBUG ("Could not find pad for node %" GST_PTR_FORMAT "\n", topology);
628     return NULL;
629   }
630
631   gst_structure_id_get ((GstStructure *) topology, _TOPOLOGY_PAD_QUARK,
632       GST_TYPE_PAD, &pad, NULL);
633
634   if (!dc->streams)
635     return NULL;
636
637   for (i = 0, tmp = dc->streams; tmp; tmp = tmp->next, i++) {
638     ps = (PrivateStream *) tmp->data;
639
640     target_pad = gst_ghost_pad_get_target (GST_GHOST_PAD (ps->pad));
641     gst_object_unref (target_pad);
642
643     if (target_pad == pad)
644       break;
645   }
646
647   if (tmp)
648     st = collect_stream_information (dc, ps, i);
649
650   gst_object_unref (pad);
651
652   return st;
653 }
654
655 static gboolean
656 child_is_raw_stream (GstCaps * parent, GstCaps * child)
657 {
658   GstStructure *st1, *st2;
659   const gchar *name1, *name2;
660
661   st1 = gst_caps_get_structure (parent, 0);
662   name1 = gst_structure_get_name (st1);
663   st2 = gst_caps_get_structure (child, 0);
664   name2 = gst_structure_get_name (st2);
665
666   if ((g_str_has_prefix (name1, "audio/") &&
667           g_str_has_prefix (name2, "audio/x-raw")) ||
668       ((g_str_has_prefix (name1, "video/") ||
669               g_str_has_prefix (name1, "image/")) &&
670           g_str_has_prefix (name2, "video/x-raw"))) {
671     /* child is the "raw" sub-stream corresponding to parent */
672     return TRUE;
673   }
674
675   return FALSE;
676 }
677
678 /* If a parent is non-NULL, collected stream information will be appended to it
679  * (and where the information exists, it will be overriden)
680  */
681 static GstStreamInformation *
682 parse_stream_topology (GstDiscovererInternal * dc, const GstStructure * topology,
683     GstStreamInformation * parent)
684 {
685   GstStreamInformation *res = NULL;
686   GstCaps *caps = NULL;
687   const GValue *nval = NULL;
688
689   GST_DEBUG ("parsing: %" GST_PTR_FORMAT, topology);
690
691   nval = gst_structure_get_value (topology, "next");
692
693   if (nval == NULL || GST_VALUE_HOLDS_STRUCTURE (nval)) {
694     GstStructure *st = find_stream_for_node (dc, topology);
695     gboolean add_to_list = TRUE;
696
697     if (st) {
698       res = collect_information (dc, st, parent);
699       gst_structure_free (st);
700     } else {
701       /* Didn't find a stream structure, so let's just use the caps we have */
702       res = collect_information (dc, topology, parent);
703     }
704
705     if (nval == NULL) {
706       /* FIXME : aggregate with information from main streams */
707       GST_DEBUG ("Coudn't find 'next' ! might be the last entry");
708     } else {
709       GstCaps *caps;
710       const GstStructure *st;
711
712       GST_DEBUG ("next is a structure %" GST_PTR_FORMAT);
713
714       st = gst_value_get_structure (nval);
715
716       if (!parent)
717         parent = res;
718
719       if (gst_structure_id_get ((GstStructure *) st, _CAPS_QUARK, GST_TYPE_CAPS,
720               &caps, NULL)) {
721         if (gst_caps_can_intersect (parent->caps, caps)) {
722           /* We sometimes get an extra sub-stream from the parser. If this is
723            * the case, we just replace the parent caps with this stream's caps
724            * since they might contain more information */
725           gst_caps_unref (parent->caps);
726           parent->caps = caps;
727
728           parse_stream_topology (dc, st, parent);
729           add_to_list = FALSE;
730
731         } else if (child_is_raw_stream (parent->caps, caps)) {
732           /* This is the "raw" stream corresponding to the parent. This
733            * contains more information than the parent, tags etc. */
734           parse_stream_topology (dc, st, parent);
735           add_to_list = FALSE;
736           gst_caps_unref (caps);
737
738         } else {
739           GstStreamInformation *next = parse_stream_topology (dc, st, NULL);
740           res->next = next;
741           next->previous = res;
742         }
743       }
744     }
745
746     if (add_to_list) {
747       dc->current_info->stream_list =
748           g_list_append (dc->current_info->stream_list, res);
749     }
750
751   } else if (GST_VALUE_HOLDS_LIST (nval)) {
752     guint i, len;
753     GstStreamContainerInformation *cont;
754     GstTagList *tags;
755
756     if (!gst_structure_id_get ((GstStructure *) topology, _CAPS_QUARK,
757             GST_TYPE_CAPS, &caps, NULL))
758       GST_WARNING ("Couldn't find caps !");
759
760     len = gst_value_list_get_size (nval);
761     GST_DEBUG ("next is a list of %d entries", len);
762
763     cont = gst_stream_container_information_new ();
764     cont->parent.caps = caps;
765     res = (GstStreamInformation *) cont;
766
767     if (gst_structure_id_has_field (topology, _TAGS_QUARK)) {
768       gst_structure_id_get ((GstStructure *) topology, _TAGS_QUARK,
769           GST_TYPE_STRUCTURE, &tags, NULL);
770       cont->parent.tags =
771           gst_tag_list_merge (cont->parent.tags, (GstTagList *) tags,
772           GST_TAG_MERGE_APPEND);
773       gst_tag_list_free (tags);
774     }
775
776     for (i = 0; i < len; i++) {
777       const GValue *subv = gst_value_list_get_value (nval, i);
778       const GstStructure *subst = gst_value_get_structure (subv);
779       GstStreamInformation *substream;
780
781       GST_DEBUG ("%d %" GST_PTR_FORMAT, i, subst);
782
783       substream = parse_stream_topology (dc, subst, NULL);
784
785       substream->previous = res;
786       cont->streams = g_list_append (cont->streams, substream);
787     }
788   }
789
790   return res;
791 }
792
793 /* Called when pipeline is pre-rolled */
794 static void
795 discoverer_collect (GstDiscovererInternal * dc)
796 {
797   GST_DEBUG ("Collecting information");
798
799   if (dc->streams) {
800     /* FIXME : Make this querying optional */
801     if (TRUE) {
802       GstFormat format = GST_FORMAT_TIME;
803       gint64 dur;
804
805       GST_DEBUG ("Attempting to query duration");
806
807       if (gst_element_query_duration ((GstElement *) dc->pipeline, &format,
808               &dur)) {
809         if (format == GST_FORMAT_TIME) {
810           GST_DEBUG ("Got duration %" GST_TIME_FORMAT, GST_TIME_ARGS (dur));
811           dc->current_info->duration = (guint64) dur;
812         }
813       }
814     }
815
816     if (dc->current_topology)
817       dc->current_info->stream_info = parse_stream_topology (dc,
818           dc->current_topology, NULL);
819
820     /*
821      * Images need some special handling. They do not have a duration, have
822      * caps named image/<foo> (th exception being MJPEG video which is also
823      * type image/jpeg), and should consist of precisely one stream (actually
824      * initially there are 2, the image and raw stream, but we squash these
825      * while parsing the stream topology). At some ponit, if we find that these
826      * conditions are not sufficient, we can count the number of decoders and
827      * parsers in the chain, and if there's more than one decoder, or any
828      * parser at all, we should not mark this as an image.
829      */
830     if (dc->current_info->duration == 0 &&
831         dc->current_info->stream_info != NULL &&
832         dc->current_info->stream_info->next == NULL) {
833       GstStructure *st =
834           gst_caps_get_structure (dc->current_info->stream_info->caps, 0);
835
836       if (g_str_has_prefix (gst_structure_get_name (st), "image/"))
837         dc->current_info->stream_info->streamtype = GST_STREAM_IMAGE;
838     }
839   }
840
841   if (dc->async) {
842     GST_DEBUG ("Emitting 'discoverered'");
843     g_signal_emit (dc, gst_discoverer_signals[SIGNAL_DISCOVERED], 0,
844         dc->current_info, dc->current_error);
845     /* Clients get a copy of current_info since it is a boxed type */
846     gst_discoverer_information_free (dc->current_info);
847   }
848 }
849
850 static void
851 handle_current_async (GstDiscovererInternal * dc)
852 {
853   /* FIXME : TIMEOUT ! */
854 }
855
856
857 /* Returns TRUE if processing should stop */
858 static gboolean
859 handle_message (GstDiscovererInternal * dc, GstMessage * msg)
860 {
861   gboolean done = FALSE;
862
863   GST_DEBUG ("got a %s message", GST_MESSAGE_TYPE_NAME (msg));
864
865   switch (GST_MESSAGE_TYPE (msg)) {
866     case GST_MESSAGE_ERROR:{
867       GError *gerr;
868       gchar *debug;
869
870       gst_message_parse_error (msg, &gerr, &debug);
871       GST_WARNING ("Got an error [debug:%s]", debug);
872       dc->current_error = gerr;
873       g_free (debug);
874
875       /* We need to stop */
876       done = TRUE;
877
878       dc->current_info->result |= GST_DISCOVERER_ERROR;
879     }
880       break;
881
882     case GST_MESSAGE_EOS:
883       GST_DEBUG ("Got EOS !");
884       done = TRUE;
885       break;
886
887     case GST_MESSAGE_ASYNC_DONE:
888       if (GST_MESSAGE_SRC (msg) == (GstObject *) dc->pipeline) {
889         GST_DEBUG ("Finished changing state asynchronously");
890         done = TRUE;
891
892       }
893       break;
894
895     case GST_MESSAGE_ELEMENT:
896     {
897       GQuark sttype = gst_structure_get_name_id (msg->structure);
898       GST_DEBUG_OBJECT (GST_MESSAGE_SRC (msg),
899           "structure %" GST_PTR_FORMAT, msg->structure);
900       if (sttype == _MISSING_PLUGIN_QUARK) {
901         dc->current_info->result |= GST_DISCOVERER_MISSING_PLUGINS;
902         dc->current_info->misc = gst_structure_copy (msg->structure);
903       } else if (sttype == _STREAM_TOPOLOGY_QUARK) {
904         dc->current_topology = gst_structure_copy (msg->structure);
905       }
906     }
907       break;
908
909     case GST_MESSAGE_TAG:
910     {
911       GstTagList *tl;
912
913       gst_message_parse_tag (msg, &tl);
914       GST_DEBUG ("Got tags %" GST_PTR_FORMAT, tl);
915       /* Merge with current tags */
916       dc->current_info->tags =
917           gst_tag_list_merge (dc->current_info->tags, tl, GST_TAG_MERGE_APPEND);
918       gst_tag_list_free (tl);
919     }
920       break;
921
922     default:
923       break;
924   }
925
926   return done;
927 }
928
929
930 static void
931 handle_current_sync (GstDiscovererInternal * dc)
932 {
933   GTimer *timer;
934   gdouble deadline = ((gdouble) dc->timeout) / GST_SECOND;
935   GstMessage *msg;
936   gboolean done = FALSE;
937
938   timer = g_timer_new ();
939   g_timer_start (timer);
940
941   do {
942     /* poll bus with timeout */
943     /* FIXME : make the timeout more fine-tuned */
944     if ((msg = gst_bus_timed_pop (dc->bus, GST_SECOND / 2))) {
945       done = handle_message (dc, msg);
946       gst_message_unref (msg);
947     }
948
949   } while (!done && (g_timer_elapsed (timer, NULL) < deadline));
950
951   /* return result */
952   if (!done) {
953     GST_DEBUG ("we timed out!");
954     dc->current_info->result |= GST_DISCOVERER_TIMEOUT;
955   }
956
957   GST_DEBUG ("Done");
958
959   g_timer_stop (timer);
960   g_timer_destroy (timer);
961 }
962
963 static void
964 _setup_locked (GstDiscovererInternal * dc)
965 {
966   GstStateChangeReturn ret;
967
968   GST_DEBUG ("Setting up");
969
970   /* Pop URI off the pending URI list */
971   dc->current_info = gst_discoverer_information_new ();
972   dc->current_info->uri = (gchar *) dc->pending_uris->data;
973   dc->pending_uris = g_list_delete_link (dc->pending_uris, dc->pending_uris);
974
975   /* set uri on uridecodebin */
976   g_object_set (dc->uridecodebin, "uri", dc->current_info->uri, NULL);
977
978   GST_DEBUG ("Current is now %s", dc->current_info->uri);
979
980   /* set pipeline to PAUSED */
981   dc->running = TRUE;
982
983   DISCO_UNLOCK (dc);
984   GST_DEBUG ("Setting pipeline to PAUSED");
985   ret = gst_element_set_state ((GstElement *) dc->pipeline, GST_STATE_PAUSED);
986   DISCO_LOCK (dc);
987
988   GST_DEBUG_OBJECT (dc, "Pipeline going to PAUSED : %s",
989       gst_element_state_change_return_get_name (ret));
990 }
991
992 static void
993 discoverer_cleanup (GstDiscovererInternal * dc)
994 {
995   GST_DEBUG ("Cleaning up");
996
997   gst_bus_set_flushing (dc->bus, TRUE);
998   gst_element_set_state ((GstElement *) dc->pipeline, GST_STATE_READY);
999   gst_bus_set_flushing (dc->bus, FALSE);
1000
1001   DISCO_LOCK (dc);
1002   if (dc->current_error)
1003     g_error_free (dc->current_error);
1004   dc->current_error = NULL;
1005   if (dc->current_topology) {
1006     gst_structure_free (dc->current_topology);
1007     dc->current_topology = NULL;
1008   }
1009
1010   dc->current_info = NULL;
1011
1012   /* Try popping the next uri */
1013   if (dc->async) {
1014     if (dc->pending_uris != NULL) {
1015       _setup_locked (dc);
1016       DISCO_UNLOCK (dc);
1017       /* Start timeout */
1018       handle_current_async (dc);
1019     } else {
1020       /* We're done ! */
1021       DISCO_UNLOCK (dc);
1022       g_signal_emit (dc, gst_discoverer_signals[SIGNAL_READY], 0);
1023     }
1024   } else
1025     DISCO_UNLOCK (dc);
1026
1027   GST_DEBUG ("out");
1028 }
1029
1030 static void
1031 discoverer_bus_cb (GstBus * bus, GstMessage * msg, GstDiscovererInternal * dc)
1032 {
1033   GST_DEBUG ("dc->running:%d", dc->running);
1034   if (dc->running) {
1035     if (handle_message (dc, msg)) {
1036       GST_DEBUG ("Stopping asynchronously");
1037       dc->running = FALSE;
1038       discoverer_collect (dc);
1039       discoverer_cleanup (dc);
1040     }
1041   }
1042 }
1043
1044
1045
1046 /* If there is a pending URI, it will pop it from the list of pending
1047  * URIs and start the discovery on it.
1048  *
1049  * Returns GST_DISCOVERER_OK if the next URI was popped and is processing,
1050  * else a error flag.
1051  */
1052 static GstDiscovererResult
1053 start_discovering (GstDiscovererInternal * dc)
1054 {
1055   GstDiscovererResult res = GST_DISCOVERER_OK;
1056
1057   GST_DEBUG ("Starting");
1058
1059   DISCO_LOCK (dc);
1060   if (dc->pending_uris == NULL) {
1061     GST_WARNING ("No URI to process");
1062     res |= GST_DISCOVERER_URI_INVALID;
1063     DISCO_UNLOCK (dc);
1064     goto beach;
1065   }
1066
1067   if (dc->current_info != NULL) {
1068     GST_WARNING ("Already processing a file");
1069     res |= GST_DISCOVERER_BUSY;
1070     DISCO_UNLOCK (dc);
1071     goto beach;
1072   }
1073
1074   _setup_locked (dc);
1075
1076   DISCO_UNLOCK (dc);
1077
1078   if (dc->async)
1079     handle_current_async (dc);
1080   else
1081     handle_current_sync (dc);
1082
1083 beach:
1084   return res;
1085 }
1086
1087
1088 /**
1089  * gst_discoverer_internal_start:
1090  * @discoverer: A #GstDiscovererInternal
1091  * 
1092  * Allow asynchronous discovering of URIs to take place.
1093  */
1094 void
1095 gst_discoverer_internal_start (GstDiscovererInternal * discoverer)
1096 {
1097   GST_DEBUG_OBJECT (discoverer, "Starting...");
1098
1099   if (discoverer->async) {
1100     GST_DEBUG_OBJECT (discoverer, "We were already started");
1101     return;
1102   }
1103
1104   discoverer->async = TRUE;
1105   /* Connect to bus signals */
1106   gst_bus_add_signal_watch (discoverer->bus);
1107
1108   start_discovering (discoverer);
1109   GST_DEBUG_OBJECT (discoverer, "Started");
1110 }
1111
1112 /**
1113  * gst_discoverer_internal_stop:
1114  * @discoverer: A #GstDiscovererInternal
1115  *
1116  * Stop the discovery of any pending URIs and clears the list of
1117  * pending URIS (if any).
1118  */
1119 void
1120 gst_discoverer_internal_stop (GstDiscovererInternal * discoverer)
1121 {
1122   GST_DEBUG_OBJECT (discoverer, "Stopping...");
1123
1124   if (!discoverer->async) {
1125     GST_DEBUG_OBJECT (discoverer,
1126         "We were already stopped, or running synchronously");
1127     return;
1128   }
1129
1130   DISCO_LOCK (discoverer);
1131   if (discoverer->running) {
1132     /* FIXME : Stop any ongoing discovery */
1133   }
1134   DISCO_UNLOCK (discoverer);
1135
1136   /* Remove signal watch */
1137   gst_bus_remove_signal_watch (discoverer->bus);
1138   discoverer_reset (discoverer);
1139
1140   discoverer->async = FALSE;
1141
1142   GST_DEBUG_OBJECT (discoverer, "Stopped");
1143 }
1144
1145 /**
1146  * gst_discoverer_internal_append_uri:
1147  * @discoverer: A #GstDiscovererInternal
1148  * @uri: the URI to add.
1149  *
1150  * Appends the given @uri to the list of URIs to discoverer. The actual
1151  * discovery of the @uri will only take place if @gst_discoverer_start has
1152  * been called.
1153  *
1154  * A copy of @uri will be done internally, the caller can safely %g_free afterwards.
1155  *
1156  * Returns: TRUE if the @uri was succesfully appended to the list of pending
1157  * uris, else FALSE
1158  */
1159 gboolean
1160 gst_discoverer_internal_append_uri (GstDiscovererInternal * discoverer, gchar * uri)
1161 {
1162   gboolean can_run;
1163
1164   GST_DEBUG_OBJECT (discoverer, "uri : %s", uri);
1165
1166   DISCO_LOCK (discoverer);
1167   can_run = (discoverer->pending_uris == NULL);
1168   discoverer->pending_uris =
1169       g_list_append (discoverer->pending_uris, g_strdup (uri));
1170   DISCO_UNLOCK (discoverer);
1171
1172   if (can_run)
1173     start_discovering (discoverer);
1174
1175   return TRUE;
1176 }
1177
1178
1179 /* Synchronous mode */
1180 /**
1181  * gst_discoverer_internal_discover_uri:
1182  *
1183  * @discoverer: A #GstDiscovererInternal
1184  * @uri: The URI to run on.
1185  * @err: If an error occured, this field will be filled in.
1186  *
1187  * Synchronously discovers the given @uri.
1188  *
1189  * A copy of @uri will be done internally, the caller can safely %g_free afterwards.
1190  *
1191  * Returns: (transfer none): see #GstDiscovererInformation. The caller must free this structure
1192  * after use.
1193  */
1194 GstDiscovererInformation *
1195 gst_discoverer_internal_discover_uri (GstDiscovererInternal * discoverer, gchar * uri,
1196     GError ** err)
1197 {
1198   GstDiscovererResult res = 0;
1199   GstDiscovererInformation *info;
1200
1201   GST_DEBUG_OBJECT (discoverer, "uri:%s", uri);
1202
1203   DISCO_LOCK (discoverer);
1204   if (G_UNLIKELY (discoverer->current_info)) {
1205     DISCO_UNLOCK (discoverer);
1206     GST_WARNING_OBJECT (discoverer, "Already handling a uri");
1207     return NULL;
1208   }
1209
1210   discoverer->pending_uris =
1211       g_list_append (discoverer->pending_uris, g_strdup (uri));
1212   DISCO_UNLOCK (discoverer);
1213
1214   res = start_discovering (discoverer);
1215   discoverer_collect (discoverer);
1216
1217   /* Get results */
1218   if (discoverer->current_error)
1219     *err = g_error_copy (discoverer->current_error);
1220   else
1221     *err = NULL;
1222   discoverer->current_info->result |= res;
1223   info = discoverer->current_info;
1224
1225   discoverer_cleanup (discoverer);
1226
1227   return info;
1228 }
1229
1230 /**
1231  * gst_discoverer_internal_new:
1232  * @timeout: The timeout to set on the discoverer
1233  *
1234  * Creates a new #GstDiscovererInternal with the provided timeout.
1235  *
1236  * Returns: The new #GstDiscovererInternal. Free with g_object_unref() when done.
1237  */
1238 GstDiscovererInternal *
1239 gst_discoverer_internal_new (GstClockTime timeout)
1240 {
1241   return g_object_new (GST_TYPE_DISCOVERER_INTERNAL, "timeout", timeout, NULL);
1242 }