pulse: Drop support for PA versions before 0.9.16
[gstreamer-omap:gst-plugins-good.git] / ext / pulse / pulsesink.c
1 /*-*- Mode: C; c-basic-offset: 2 -*-*/
2
3 /*  GStreamer pulseaudio plugin
4  *
5  *  Copyright (c) 2004-2008 Lennart Poettering
6  *            (c) 2009      Wim Taymans
7  *
8  *  gst-pulse is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU Lesser General Public License as
10  *  published by the Free Software Foundation; either version 2.1 of the
11  *  License, or (at your option) any later version.
12  *
13  *  gst-pulse is distributed in the hope that it will be useful, but
14  *  WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  *  Lesser General Public License for more details.
17  *
18  *  You should have received a copy of the GNU Lesser General Public
19  *  License along with gst-pulse; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  *  USA.
22  */
23
24 /**
25  * SECTION:element-pulsesink
26  * @see_also: pulsesrc, pulsemixer
27  *
28  * This element outputs audio to a
29  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
30  *
31  * <refsect2>
32  * <title>Example pipelines</title>
33  * |[
34  * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! audioresample ! pulsesink
35  * ]| Play an Ogg/Vorbis file.
36  * |[
37  * gst-launch -v audiotestsrc ! audioconvert ! volume volume=0.4 ! pulsesink
38  * ]| Play a 440Hz sine wave.
39  * |[
40  * gst-launch -v audiotestsrc ! pulsesink stream-properties="props,media.title=test"
41  * ]| Play a sine wave and set a stream property. The property can be checked
42  * with "pactl list".
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #include "config.h"
48 #endif
49
50 #include <string.h>
51 #include <stdio.h>
52
53 #include <gst/base/gstbasesink.h>
54 #include <gst/gsttaglist.h>
55 #include <gst/interfaces/streamvolume.h>
56 #include <gst/gst-i18n-plugin.h>
57
58 #include <gst/pbutils/pbutils.h>        /* only used for GST_PLUGINS_BASE_VERSION_* */
59
60 #include "pulsesink.h"
61 #include "pulseutil.h"
62
63 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
64 #define GST_CAT_DEFAULT pulse_debug
65
66 #define DEFAULT_SERVER          NULL
67 #define DEFAULT_DEVICE          NULL
68 #define DEFAULT_DEVICE_NAME     NULL
69 #define DEFAULT_VOLUME          1.0
70 #define DEFAULT_MUTE            FALSE
71 #define MAX_VOLUME              10.0
72
73 enum
74 {
75   PROP_0,
76   PROP_SERVER,
77   PROP_DEVICE,
78   PROP_DEVICE_NAME,
79   PROP_VOLUME,
80   PROP_MUTE,
81   PROP_CLIENT,
82   PROP_STREAM_PROPERTIES,
83   PROP_LAST
84 };
85
86 #define GST_TYPE_PULSERING_BUFFER        \
87         (gst_pulseringbuffer_get_type())
88 #define GST_PULSERING_BUFFER(obj)        \
89         (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
90 #define GST_PULSERING_BUFFER_CLASS(klass) \
91         (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
92 #define GST_PULSERING_BUFFER_GET_CLASS(obj) \
93         (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
94 #define GST_PULSERING_BUFFER_CAST(obj)        \
95         ((GstPulseRingBuffer *)obj)
96 #define GST_IS_PULSERING_BUFFER(obj)     \
97         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
98 #define GST_IS_PULSERING_BUFFER_CLASS(klass)\
99         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
100
101 typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
102 typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
103
104 typedef struct _GstPulseContext GstPulseContext;
105
106 /* Store the PA contexts in a hash table to allow easy sharing among
107  * multiple instances of the sink. Keys are $context_name@$server_name
108  * (strings) and values should be GstPulseContext pointers.
109  */
110 struct _GstPulseContext
111 {
112   pa_context *context;
113   GSList *ring_buffers;
114 };
115
116 static GHashTable *gst_pulse_shared_contexts = NULL;
117
118 /* use one static main-loop for all instances
119  * this is needed to make the context sharing work as the contexts are
120  * released when releasing their parent main-loop
121  */
122 static pa_threaded_mainloop *mainloop = NULL;
123 static guint mainloop_ref_ct = 0;
124
125 /* lock for access to shared resources */
126 static GMutex *pa_shared_resource_mutex = NULL;
127
128 /* We keep a custom ringbuffer that is backed up by data allocated by
129  * pulseaudio. We must also overide the commit function to write into
130  * pulseaudio memory instead. */
131 struct _GstPulseRingBuffer
132 {
133   GstRingBuffer object;
134
135   gchar *context_name;
136   gchar *stream_name;
137
138   pa_context *context;
139   pa_stream *stream;
140
141   pa_sample_spec sample_spec;
142
143   void *m_data;
144   size_t m_towrite;
145   size_t m_writable;
146   gint64 m_offset;
147   gint64 m_lastoffset;
148
149   gboolean corked:1;
150   gboolean in_commit:1;
151   gboolean paused:1;
152 };
153 struct _GstPulseRingBufferClass
154 {
155   GstRingBufferClass parent_class;
156 };
157
158 static GType gst_pulseringbuffer_get_type (void);
159 static void gst_pulseringbuffer_finalize (GObject * object);
160
161 static GstRingBufferClass *ring_parent_class = NULL;
162
163 static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
164 static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
165 static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
166     GstRingBufferSpec * spec);
167 static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
168 static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
169 static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
170 static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
171 static void gst_pulseringbuffer_clear (GstRingBuffer * buf);
172 static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
173     guint64 * sample, guchar * data, gint in_samples, gint out_samples,
174     gint * accum);
175
176 G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
177
178 static void
179 gst_pulsesink_init_contexts (void)
180 {
181   g_assert (pa_shared_resource_mutex == NULL);
182   pa_shared_resource_mutex = g_mutex_new ();
183   gst_pulse_shared_contexts = g_hash_table_new_full (g_str_hash, g_str_equal,
184       g_free, NULL);
185 }
186
187 static void
188 gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
189 {
190   GObjectClass *gobject_class;
191   GstRingBufferClass *gstringbuffer_class;
192
193   gobject_class = (GObjectClass *) klass;
194   gstringbuffer_class = (GstRingBufferClass *) klass;
195
196   ring_parent_class = g_type_class_peek_parent (klass);
197
198   gobject_class->finalize = gst_pulseringbuffer_finalize;
199
200   gstringbuffer_class->open_device =
201       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
202   gstringbuffer_class->close_device =
203       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
204   gstringbuffer_class->acquire =
205       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
206   gstringbuffer_class->release =
207       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
208   gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
209   gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
210   gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
211   gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
212   gstringbuffer_class->clear_all =
213       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_clear);
214
215   gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
216 }
217
218 static void
219 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
220 {
221   pbuf->stream_name = NULL;
222   pbuf->context = NULL;
223   pbuf->stream = NULL;
224
225   pa_sample_spec_init (&pbuf->sample_spec);
226
227   pbuf->m_data = NULL;
228   pbuf->m_towrite = 0;
229   pbuf->m_writable = 0;
230   pbuf->m_offset = 0;
231   pbuf->m_lastoffset = 0;
232
233   pbuf->corked = TRUE;
234   pbuf->in_commit = FALSE;
235   pbuf->paused = FALSE;
236 }
237
238 static void
239 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
240 {
241   if (pbuf->stream) {
242
243     if (pbuf->m_data) {
244       /* drop shm memory buffer */
245       pa_stream_cancel_write (pbuf->stream);
246
247       /* reset internal variables */
248       pbuf->m_data = NULL;
249       pbuf->m_towrite = 0;
250       pbuf->m_writable = 0;
251       pbuf->m_offset = 0;
252       pbuf->m_lastoffset = 0;
253     }
254
255     pa_stream_disconnect (pbuf->stream);
256
257     /* Make sure we don't get any further callbacks */
258     pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
259     pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
260     pa_stream_set_underflow_callback (pbuf->stream, NULL, NULL);
261     pa_stream_set_overflow_callback (pbuf->stream, NULL, NULL);
262
263     pa_stream_unref (pbuf->stream);
264     pbuf->stream = NULL;
265   }
266
267   g_free (pbuf->stream_name);
268   pbuf->stream_name = NULL;
269 }
270
271 static void
272 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
273 {
274   g_mutex_lock (pa_shared_resource_mutex);
275
276   GST_DEBUG_OBJECT (pbuf, "destroying ringbuffer %p", pbuf);
277
278   gst_pulsering_destroy_stream (pbuf);
279
280   if (pbuf->context) {
281     pa_context_unref (pbuf->context);
282     pbuf->context = NULL;
283   }
284
285   if (pbuf->context_name) {
286     GstPulseContext *pctx;
287
288     pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
289
290     GST_DEBUG_OBJECT (pbuf, "releasing context with name %s, pbuf=%p, pctx=%p",
291         pbuf->context_name, pbuf, pctx);
292
293     if (pctx) {
294       pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
295       if (pctx->ring_buffers == NULL) {
296         GST_DEBUG_OBJECT (pbuf,
297             "destroying final context with name %s, pbuf=%p, pctx=%p",
298             pbuf->context_name, pbuf, pctx);
299
300         pa_context_disconnect (pctx->context);
301
302         /* Make sure we don't get any further callbacks */
303         pa_context_set_state_callback (pctx->context, NULL, NULL);
304         pa_context_set_subscribe_callback (pctx->context, NULL, NULL);
305
306         g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
307
308         pa_context_unref (pctx->context);
309         g_slice_free (GstPulseContext, pctx);
310       }
311     }
312     g_free (pbuf->context_name);
313     pbuf->context_name = NULL;
314   }
315   g_mutex_unlock (pa_shared_resource_mutex);
316 }
317
318 static void
319 gst_pulseringbuffer_finalize (GObject * object)
320 {
321   GstPulseRingBuffer *ringbuffer;
322
323   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
324
325   gst_pulsering_destroy_context (ringbuffer);
326   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
327 }
328
329
330 #define CONTEXT_OK(c) ((c) && PA_CONTEXT_IS_GOOD (pa_context_get_state ((c))))
331 #define STREAM_OK(s) ((s) && PA_STREAM_IS_GOOD (pa_stream_get_state ((s))))
332
333 static gboolean
334 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf,
335     gboolean check_stream)
336 {
337   if (!CONTEXT_OK (pbuf->context))
338     goto error;
339
340   if (check_stream && !STREAM_OK (pbuf->stream))
341     goto error;
342
343   return FALSE;
344
345 error:
346   {
347     const gchar *err_str =
348         pbuf->context ? pa_strerror (pa_context_errno (pbuf->context)) : NULL;
349     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
350             err_str), (NULL));
351     return TRUE;
352   }
353 }
354
355 static void
356 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
357 {
358   pa_context_state_t state;
359   pa_threaded_mainloop *mainloop = (pa_threaded_mainloop *) userdata;
360
361   state = pa_context_get_state (c);
362
363   GST_LOG ("got new context state %d", state);
364
365   switch (state) {
366     case PA_CONTEXT_READY:
367     case PA_CONTEXT_TERMINATED:
368     case PA_CONTEXT_FAILED:
369       GST_LOG ("signaling");
370       pa_threaded_mainloop_signal (mainloop, 0);
371       break;
372
373     case PA_CONTEXT_UNCONNECTED:
374     case PA_CONTEXT_CONNECTING:
375     case PA_CONTEXT_AUTHORIZING:
376     case PA_CONTEXT_SETTING_NAME:
377       break;
378   }
379 }
380
381 static void
382 gst_pulsering_context_subscribe_cb (pa_context * c,
383     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
384 {
385   GstPulseSink *psink;
386   GstPulseContext *pctx = (GstPulseContext *) userdata;
387   GSList *walk;
388
389   if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
390       t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
391     return;
392
393   for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
394     GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
395     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
396
397     GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
398
399     if (!pbuf->stream)
400       continue;
401
402     if (idx != pa_stream_get_index (pbuf->stream))
403       continue;
404
405     /* Actually this event is also triggered when other properties of
406      * the stream change that are unrelated to the volume. However it is
407      * probably cheaper to signal the change here and check for the
408      * volume when the GObject property is read instead of querying it always. */
409
410     /* inform streaming thread to notify */
411     g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
412   }
413 }
414
415 /* will be called when the device should be opened. In this case we will connect
416  * to the server. We should not try to open any streams in this state. */
417 static gboolean
418 gst_pulseringbuffer_open_device (GstRingBuffer * buf)
419 {
420   GstPulseSink *psink;
421   GstPulseRingBuffer *pbuf;
422   GstPulseContext *pctx;
423   pa_mainloop_api *api;
424   gboolean need_unlock_shared;
425
426   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
427   pbuf = GST_PULSERING_BUFFER_CAST (buf);
428
429   g_assert (!pbuf->stream);
430   g_assert (psink->client_name);
431
432   if (psink->server)
433     pbuf->context_name = g_strdup_printf ("%s@%s", psink->client_name,
434         psink->server);
435   else
436     pbuf->context_name = g_strdup (psink->client_name);
437
438   pa_threaded_mainloop_lock (mainloop);
439
440   g_mutex_lock (pa_shared_resource_mutex);
441   need_unlock_shared = TRUE;
442
443   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
444   if (pctx == NULL) {
445     pctx = g_slice_new0 (GstPulseContext);
446
447     /* get the mainloop api and create a context */
448     GST_INFO_OBJECT (psink, "new context with name %s, pbuf=%p, pctx=%p",
449         pbuf->context_name, pbuf, pctx);
450     api = pa_threaded_mainloop_get_api (mainloop);
451     if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
452       goto create_failed;
453
454     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
455     g_hash_table_insert (gst_pulse_shared_contexts,
456         g_strdup (pbuf->context_name), (gpointer) pctx);
457     /* register some essential callbacks */
458     pa_context_set_state_callback (pctx->context,
459         gst_pulsering_context_state_cb, mainloop);
460     pa_context_set_subscribe_callback (pctx->context,
461         gst_pulsering_context_subscribe_cb, pctx);
462
463     /* try to connect to the server and wait for completion, we don't want to
464      * autospawn a deamon */
465     GST_LOG_OBJECT (psink, "connect to server %s",
466         GST_STR_NULL (psink->server));
467     if (pa_context_connect (pctx->context, psink->server,
468             PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
469       goto connect_failed;
470   } else {
471     GST_INFO_OBJECT (psink,
472         "reusing shared context with name %s, pbuf=%p, pctx=%p",
473         pbuf->context_name, pbuf, pctx);
474     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
475   }
476
477   g_mutex_unlock (pa_shared_resource_mutex);
478   need_unlock_shared = FALSE;
479
480   /* context created or shared okay */
481   pbuf->context = pa_context_ref (pctx->context);
482
483   for (;;) {
484     pa_context_state_t state;
485
486     state = pa_context_get_state (pbuf->context);
487
488     GST_LOG_OBJECT (psink, "context state is now %d", state);
489
490     if (!PA_CONTEXT_IS_GOOD (state))
491       goto connect_failed;
492
493     if (state == PA_CONTEXT_READY)
494       break;
495
496     /* Wait until the context is ready */
497     GST_LOG_OBJECT (psink, "waiting..");
498     pa_threaded_mainloop_wait (mainloop);
499   }
500
501   GST_LOG_OBJECT (psink, "opened the device");
502
503   pa_threaded_mainloop_unlock (mainloop);
504
505   return TRUE;
506
507   /* ERRORS */
508 unlock_and_fail:
509   {
510     if (need_unlock_shared)
511       g_mutex_unlock (pa_shared_resource_mutex);
512     gst_pulsering_destroy_context (pbuf);
513     pa_threaded_mainloop_unlock (mainloop);
514     return FALSE;
515   }
516 create_failed:
517   {
518     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
519         ("Failed to create context"), (NULL));
520     g_slice_free (GstPulseContext, pctx);
521     goto unlock_and_fail;
522   }
523 connect_failed:
524   {
525     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
526             pa_strerror (pa_context_errno (pctx->context))), (NULL));
527     goto unlock_and_fail;
528   }
529 }
530
531 /* close the device */
532 static gboolean
533 gst_pulseringbuffer_close_device (GstRingBuffer * buf)
534 {
535   GstPulseSink *psink;
536   GstPulseRingBuffer *pbuf;
537
538   pbuf = GST_PULSERING_BUFFER_CAST (buf);
539   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
540
541   GST_LOG_OBJECT (psink, "closing device");
542
543   pa_threaded_mainloop_lock (mainloop);
544   gst_pulsering_destroy_context (pbuf);
545   pa_threaded_mainloop_unlock (mainloop);
546
547   GST_LOG_OBJECT (psink, "closed device");
548
549   return TRUE;
550 }
551
552 static void
553 gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
554 {
555   GstPulseSink *psink;
556   GstPulseRingBuffer *pbuf;
557   pa_stream_state_t state;
558
559   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
560   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
561
562   state = pa_stream_get_state (s);
563   GST_LOG_OBJECT (psink, "got new stream state %d", state);
564
565   switch (state) {
566     case PA_STREAM_READY:
567     case PA_STREAM_FAILED:
568     case PA_STREAM_TERMINATED:
569       GST_LOG_OBJECT (psink, "signaling");
570       pa_threaded_mainloop_signal (mainloop, 0);
571       break;
572     case PA_STREAM_UNCONNECTED:
573     case PA_STREAM_CREATING:
574       break;
575   }
576 }
577
578 static void
579 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
580 {
581   GstPulseSink *psink;
582   GstRingBuffer *rbuf;
583   GstPulseRingBuffer *pbuf;
584
585   rbuf = GST_RING_BUFFER_CAST (userdata);
586   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
587   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
588
589   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
590
591   if (pbuf->in_commit && (length >= rbuf->spec.segsize)) {
592     /* only signal when we are waiting in the commit thread
593      * and got request for atleast a segment */
594     pa_threaded_mainloop_signal (mainloop, 0);
595   }
596 }
597
598 static void
599 gst_pulsering_stream_underflow_cb (pa_stream * s, void *userdata)
600 {
601   GstPulseSink *psink;
602   GstPulseRingBuffer *pbuf;
603
604   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
605   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
606
607   GST_WARNING_OBJECT (psink, "Got underflow");
608 }
609
610 static void
611 gst_pulsering_stream_overflow_cb (pa_stream * s, void *userdata)
612 {
613   GstPulseSink *psink;
614   GstPulseRingBuffer *pbuf;
615
616   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
617   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
618
619   GST_WARNING_OBJECT (psink, "Got overflow");
620 }
621
622 static void
623 gst_pulsering_stream_latency_cb (pa_stream * s, void *userdata)
624 {
625   GstPulseSink *psink;
626   GstPulseRingBuffer *pbuf;
627   const pa_timing_info *info;
628   pa_usec_t sink_usec;
629
630   info = pa_stream_get_timing_info (s);
631
632   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
633   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
634
635   if (!info) {
636     GST_LOG_OBJECT (psink, "latency update (information unknown)");
637     return;
638   }
639   sink_usec = info->configured_sink_usec;
640
641   GST_LOG_OBJECT (psink,
642       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
643       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
644       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
645       info->write_index, info->read_index_corrupt, info->read_index,
646       info->sink_usec, sink_usec);
647 }
648
649 static void
650 gst_pulsering_stream_suspended_cb (pa_stream * p, void *userdata)
651 {
652   GstPulseSink *psink;
653   GstPulseRingBuffer *pbuf;
654
655   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
656   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
657
658   if (pa_stream_is_suspended (p))
659     GST_DEBUG_OBJECT (psink, "stream suspended");
660   else
661     GST_DEBUG_OBJECT (psink, "stream resumed");
662 }
663
664 static void
665 gst_pulsering_stream_started_cb (pa_stream * p, void *userdata)
666 {
667   GstPulseSink *psink;
668   GstPulseRingBuffer *pbuf;
669
670   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
671   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
672
673   GST_DEBUG_OBJECT (psink, "stream started");
674 }
675
676 static void
677 gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
678     pa_proplist * pl, void *userdata)
679 {
680   GstPulseSink *psink;
681   GstPulseRingBuffer *pbuf;
682
683   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
684   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
685
686   if (!strcmp (name, PA_STREAM_EVENT_REQUEST_CORK)) {
687     /* the stream wants to PAUSE, post a message for the application. */
688     GST_DEBUG_OBJECT (psink, "got request for CORK");
689     gst_element_post_message (GST_ELEMENT_CAST (psink),
690         gst_message_new_request_state (GST_OBJECT_CAST (psink),
691             GST_STATE_PAUSED));
692
693   } else if (!strcmp (name, PA_STREAM_EVENT_REQUEST_UNCORK)) {
694     GST_DEBUG_OBJECT (psink, "got request for UNCORK");
695     gst_element_post_message (GST_ELEMENT_CAST (psink),
696         gst_message_new_request_state (GST_OBJECT_CAST (psink),
697             GST_STATE_PLAYING));
698   } else {
699     GST_DEBUG_OBJECT (psink, "got unknown event %s", name);
700   }
701 }
702
703 /* This method should create a new stream of the given @spec. No playback should
704  * start yet so we start in the corked state. */
705 static gboolean
706 gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
707 {
708   GstPulseSink *psink;
709   GstPulseRingBuffer *pbuf;
710   pa_buffer_attr wanted;
711   const pa_buffer_attr *actual;
712   pa_channel_map channel_map;
713   pa_operation *o = NULL;
714 #ifdef HAVE_PULSE_0_9_20
715   pa_cvolume v;
716 #endif
717   pa_cvolume *pv = NULL;
718   pa_stream_flags_t flags;
719   const gchar *name;
720   GstAudioClock *clock;
721
722   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
723   pbuf = GST_PULSERING_BUFFER_CAST (buf);
724
725   GST_LOG_OBJECT (psink, "creating sample spec");
726   /* convert the gstreamer sample spec to the pulseaudio format */
727   if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
728     goto invalid_spec;
729
730   pa_threaded_mainloop_lock (mainloop);
731
732   /* we need a context and a no stream */
733   g_assert (pbuf->context);
734   g_assert (!pbuf->stream);
735
736   /* enable event notifications */
737   GST_LOG_OBJECT (psink, "subscribing to context events");
738   if (!(o = pa_context_subscribe (pbuf->context,
739               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
740     goto subscribe_failed;
741
742   pa_operation_unref (o);
743
744   /* initialize the channel map */
745   gst_pulse_gst_to_channel_map (&channel_map, spec);
746
747   /* find a good name for the stream */
748   if (psink->stream_name)
749     name = psink->stream_name;
750   else
751     name = "Playback Stream";
752
753   /* create a stream */
754   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
755   if (!(pbuf->stream = pa_stream_new_with_proplist (pbuf->context, name,
756               &pbuf->sample_spec, &channel_map, psink->proplist)))
757     goto stream_failed;
758
759   /* install essential callbacks */
760   pa_stream_set_state_callback (pbuf->stream,
761       gst_pulsering_stream_state_cb, pbuf);
762   pa_stream_set_write_callback (pbuf->stream,
763       gst_pulsering_stream_request_cb, pbuf);
764   pa_stream_set_underflow_callback (pbuf->stream,
765       gst_pulsering_stream_underflow_cb, pbuf);
766   pa_stream_set_overflow_callback (pbuf->stream,
767       gst_pulsering_stream_overflow_cb, pbuf);
768   pa_stream_set_latency_update_callback (pbuf->stream,
769       gst_pulsering_stream_latency_cb, pbuf);
770   pa_stream_set_suspended_callback (pbuf->stream,
771       gst_pulsering_stream_suspended_cb, pbuf);
772   pa_stream_set_started_callback (pbuf->stream,
773       gst_pulsering_stream_started_cb, pbuf);
774   pa_stream_set_event_callback (pbuf->stream,
775       gst_pulsering_stream_event_cb, pbuf);
776
777   /* buffering requirements. When setting prebuf to 0, the stream will not pause
778    * when we cause an underrun, which causes time to continue. */
779   memset (&wanted, 0, sizeof (wanted));
780   wanted.tlength = spec->segtotal * spec->segsize;
781   wanted.maxlength = -1;
782   wanted.prebuf = 0;
783   wanted.minreq = spec->segsize;
784
785   GST_INFO_OBJECT (psink, "tlength:   %d", wanted.tlength);
786   GST_INFO_OBJECT (psink, "maxlength: %d", wanted.maxlength);
787   GST_INFO_OBJECT (psink, "prebuf:    %d", wanted.prebuf);
788   GST_INFO_OBJECT (psink, "minreq:    %d", wanted.minreq);
789
790 #ifdef HAVE_PULSE_0_9_20
791   /* configure volume when we changed it, else we leave the default */
792   if (psink->volume_set) {
793     GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
794     pv = &v;
795     gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
796         psink->volume);
797   } else {
798     pv = NULL;
799   }
800 #endif
801
802   /* construct the flags */
803   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
804       PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED;
805
806   if (psink->mute_set && psink->mute)
807     flags |= PA_STREAM_START_MUTED;
808
809   /* we always start corked (see flags above) */
810   pbuf->corked = TRUE;
811
812   /* try to connect now */
813   GST_LOG_OBJECT (psink, "connect for playback to device %s",
814       GST_STR_NULL (psink->device));
815   if (pa_stream_connect_playback (pbuf->stream, psink->device,
816           &wanted, flags, pv, NULL) < 0)
817     goto connect_failed;
818
819   /* our clock will now start from 0 again */
820   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
821   gst_audio_clock_reset (clock, 0);
822
823   for (;;) {
824     pa_stream_state_t state;
825
826     state = pa_stream_get_state (pbuf->stream);
827
828     GST_LOG_OBJECT (psink, "stream state is now %d", state);
829
830     if (!PA_STREAM_IS_GOOD (state))
831       goto connect_failed;
832
833     if (state == PA_STREAM_READY)
834       break;
835
836     /* Wait until the stream is ready */
837     pa_threaded_mainloop_wait (mainloop);
838   }
839
840   /* After we passed the volume off of to PA we never want to set it
841      again, since it is PA's job to save/restore volumes.  */
842   psink->volume_set = psink->mute_set = FALSE;
843
844   GST_LOG_OBJECT (psink, "stream is acquired now");
845
846   /* get the actual buffering properties now */
847   actual = pa_stream_get_buffer_attr (pbuf->stream);
848
849   GST_INFO_OBJECT (psink, "tlength:   %d (wanted: %d)", actual->tlength,
850       wanted.tlength);
851   GST_INFO_OBJECT (psink, "maxlength: %d", actual->maxlength);
852   GST_INFO_OBJECT (psink, "prebuf:    %d", actual->prebuf);
853   GST_INFO_OBJECT (psink, "minreq:    %d (wanted %d)", actual->minreq,
854       wanted.minreq);
855
856   spec->segsize = actual->minreq;
857   spec->segtotal = actual->tlength / spec->segsize;
858
859   pa_threaded_mainloop_unlock (mainloop);
860
861   return TRUE;
862
863   /* ERRORS */
864 unlock_and_fail:
865   {
866     gst_pulsering_destroy_stream (pbuf);
867     pa_threaded_mainloop_unlock (mainloop);
868
869     return FALSE;
870   }
871 invalid_spec:
872   {
873     GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
874         ("Invalid sample specification."), (NULL));
875     return FALSE;
876   }
877 subscribe_failed:
878   {
879     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
880         ("pa_context_subscribe() failed: %s",
881             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
882     goto unlock_and_fail;
883   }
884 stream_failed:
885   {
886     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
887         ("Failed to create stream: %s",
888             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
889     goto unlock_and_fail;
890   }
891 connect_failed:
892   {
893     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
894         ("Failed to connect stream: %s",
895             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
896     goto unlock_and_fail;
897   }
898 }
899
900 /* free the stream that we acquired before */
901 static gboolean
902 gst_pulseringbuffer_release (GstRingBuffer * buf)
903 {
904   GstPulseRingBuffer *pbuf;
905
906   pbuf = GST_PULSERING_BUFFER_CAST (buf);
907
908   pa_threaded_mainloop_lock (mainloop);
909   gst_pulsering_destroy_stream (pbuf);
910   pa_threaded_mainloop_unlock (mainloop);
911
912   return TRUE;
913 }
914
915 static void
916 gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
917 {
918   pa_threaded_mainloop_signal (mainloop, 0);
919 }
920
921 /* update the corked state of a stream, must be called with the mainloop
922  * lock */
923 static gboolean
924 gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
925     gboolean wait)
926 {
927   pa_operation *o = NULL;
928   GstPulseSink *psink;
929   gboolean res = FALSE;
930
931   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
932
933   GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
934   if (pbuf->corked != corked) {
935     if (!(o = pa_stream_cork (pbuf->stream, corked,
936                 gst_pulsering_success_cb, pbuf)))
937       goto cork_failed;
938
939     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
940       pa_threaded_mainloop_wait (mainloop);
941       if (gst_pulsering_is_dead (psink, pbuf, TRUE))
942         goto server_dead;
943     }
944     pbuf->corked = corked;
945   } else {
946     GST_DEBUG_OBJECT (psink, "skipping, already in requested state");
947   }
948   res = TRUE;
949
950 cleanup:
951   if (o)
952     pa_operation_unref (o);
953
954   return res;
955
956   /* ERRORS */
957 server_dead:
958   {
959     GST_DEBUG_OBJECT (psink, "the server is dead");
960     goto cleanup;
961   }
962 cork_failed:
963   {
964     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
965         ("pa_stream_cork() failed: %s",
966             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
967     goto cleanup;
968   }
969 }
970
971 static void
972 gst_pulseringbuffer_clear (GstRingBuffer * buf)
973 {
974   GstPulseSink *psink;
975   GstPulseRingBuffer *pbuf;
976   pa_operation *o = NULL;
977
978   pbuf = GST_PULSERING_BUFFER_CAST (buf);
979   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
980
981   pa_threaded_mainloop_lock (mainloop);
982   GST_DEBUG_OBJECT (psink, "clearing");
983   if (pbuf->stream) {
984     /* don't wait for the flush to complete */
985     if ((o = pa_stream_flush (pbuf->stream, NULL, pbuf)))
986       pa_operation_unref (o);
987   }
988   pa_threaded_mainloop_unlock (mainloop);
989 }
990
991 static void
992 mainloop_enter_defer_cb (pa_mainloop_api * api, void *userdata)
993 {
994   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
995   GstMessage *message;
996   GValue val = { 0 };
997
998   g_value_init (&val, G_TYPE_POINTER);
999   g_value_set_pointer (&val, g_thread_self ());
1000
1001   GST_DEBUG_OBJECT (pulsesink, "posting ENTER stream status");
1002   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1003       GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT (pulsesink));
1004   gst_message_set_stream_status_object (message, &val);
1005
1006   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1007
1008   /* signal the waiter */
1009   pulsesink->pa_defer_ran = TRUE;
1010   pa_threaded_mainloop_signal (mainloop, 0);
1011 }
1012
1013 /* start/resume playback ASAP, we don't uncork here but in the commit method */
1014 static gboolean
1015 gst_pulseringbuffer_start (GstRingBuffer * buf)
1016 {
1017   GstPulseSink *psink;
1018   GstPulseRingBuffer *pbuf;
1019
1020   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1021   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1022
1023   pa_threaded_mainloop_lock (mainloop);
1024
1025   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1026   psink->pa_defer_ran = FALSE;
1027   pa_mainloop_api_once (pa_threaded_mainloop_get_api (mainloop),
1028       mainloop_enter_defer_cb, psink);
1029
1030   GST_DEBUG_OBJECT (psink, "starting");
1031   pbuf->paused = FALSE;
1032
1033   /* EOS needs running clock */
1034   if (GST_BASE_SINK_CAST (psink)->eos ||
1035       g_atomic_int_get (&GST_BASE_AUDIO_SINK (psink)->abidata.ABI.
1036           eos_rendering))
1037     gst_pulsering_set_corked (pbuf, FALSE, FALSE);
1038
1039   pa_threaded_mainloop_unlock (mainloop);
1040
1041   return TRUE;
1042 }
1043
1044 /* pause/stop playback ASAP */
1045 static gboolean
1046 gst_pulseringbuffer_pause (GstRingBuffer * buf)
1047 {
1048   GstPulseSink *psink;
1049   GstPulseRingBuffer *pbuf;
1050   gboolean res;
1051
1052   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1053   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1054
1055   pa_threaded_mainloop_lock (mainloop);
1056   GST_DEBUG_OBJECT (psink, "pausing and corking");
1057   /* make sure the commit method stops writing */
1058   pbuf->paused = TRUE;
1059   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1060   if (pbuf->in_commit) {
1061     /* we are waiting in a commit, signal */
1062     GST_DEBUG_OBJECT (psink, "signal commit");
1063     pa_threaded_mainloop_signal (mainloop, 0);
1064   }
1065   pa_threaded_mainloop_unlock (mainloop);
1066
1067   return res;
1068 }
1069
1070 static void
1071 mainloop_leave_defer_cb (pa_mainloop_api * api, void *userdata)
1072 {
1073   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1074   GstMessage *message;
1075   GValue val = { 0 };
1076
1077   g_value_init (&val, G_TYPE_POINTER);
1078   g_value_set_pointer (&val, g_thread_self ());
1079
1080   GST_DEBUG_OBJECT (pulsesink, "posting LEAVE stream status");
1081   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1082       GST_STREAM_STATUS_TYPE_LEAVE, GST_ELEMENT (pulsesink));
1083   gst_message_set_stream_status_object (message, &val);
1084   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1085
1086   pulsesink->pa_defer_ran = TRUE;
1087   pa_threaded_mainloop_signal (mainloop, 0);
1088   gst_object_unref (pulsesink);
1089 }
1090
1091 /* stop playback, we flush everything. */
1092 static gboolean
1093 gst_pulseringbuffer_stop (GstRingBuffer * buf)
1094 {
1095   GstPulseSink *psink;
1096   GstPulseRingBuffer *pbuf;
1097   gboolean res = FALSE;
1098   pa_operation *o = NULL;
1099
1100   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1101   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1102
1103   pa_threaded_mainloop_lock (mainloop);
1104   pbuf->paused = TRUE;
1105   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1106   /* Inform anyone waiting in _commit() call that it shall wakeup */
1107   if (pbuf->in_commit) {
1108     GST_DEBUG_OBJECT (psink, "signal commit thread");
1109     pa_threaded_mainloop_signal (mainloop, 0);
1110   }
1111
1112   /* then try to flush, it's not fatal when this fails */
1113   GST_DEBUG_OBJECT (psink, "flushing");
1114   if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
1115     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1116       GST_DEBUG_OBJECT (psink, "wait for completion");
1117       pa_threaded_mainloop_wait (mainloop);
1118       if (gst_pulsering_is_dead (psink, pbuf, TRUE))
1119         goto server_dead;
1120     }
1121     GST_DEBUG_OBJECT (psink, "flush completed");
1122   }
1123   res = TRUE;
1124
1125 cleanup:
1126   if (o) {
1127     pa_operation_cancel (o);
1128     pa_operation_unref (o);
1129   }
1130
1131   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1132   psink->pa_defer_ran = FALSE;
1133   gst_object_ref (psink);
1134   pa_mainloop_api_once (pa_threaded_mainloop_get_api (mainloop),
1135       mainloop_leave_defer_cb, psink);
1136
1137   GST_DEBUG_OBJECT (psink, "waiting for stream status");
1138   pa_threaded_mainloop_unlock (mainloop);
1139
1140   return res;
1141
1142   /* ERRORS */
1143 server_dead:
1144   {
1145     GST_DEBUG_OBJECT (psink, "the server is dead");
1146     goto cleanup;
1147   }
1148 }
1149
1150 /* in_samples >= out_samples, rate > 1.0 */
1151 #define FWD_UP_SAMPLES(s,se,d,de)               \
1152 G_STMT_START {                                  \
1153   guint8 *sb = s, *db = d;                      \
1154   while (s <= se && d < de) {                   \
1155     memcpy (d, s, bps);                         \
1156     s += bps;                                   \
1157     *accum += outr;                             \
1158     if ((*accum << 1) >= inr) {                 \
1159       *accum -= inr;                            \
1160       d += bps;                                 \
1161     }                                           \
1162   }                                             \
1163   in_samples -= (s - sb)/bps;                   \
1164   out_samples -= (d - db)/bps;                  \
1165   GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess);     \
1166 } G_STMT_END
1167
1168 /* out_samples > in_samples, for rates smaller than 1.0 */
1169 #define FWD_DOWN_SAMPLES(s,se,d,de)             \
1170 G_STMT_START {                                  \
1171   guint8 *sb = s, *db = d;                      \
1172   while (s <= se && d < de) {                   \
1173     memcpy (d, s, bps);                         \
1174     d += bps;                                   \
1175     *accum += inr;                              \
1176     if ((*accum << 1) >= outr) {                \
1177       *accum -= outr;                           \
1178       s += bps;                                 \
1179     }                                           \
1180   }                                             \
1181   in_samples -= (s - sb)/bps;                   \
1182   out_samples -= (d - db)/bps;                  \
1183   GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess);   \
1184 } G_STMT_END
1185
1186 #define REV_UP_SAMPLES(s,se,d,de)               \
1187 G_STMT_START {                                  \
1188   guint8 *sb = se, *db = d;                     \
1189   while (s <= se && d < de) {                   \
1190     memcpy (d, se, bps);                        \
1191     se -= bps;                                  \
1192     *accum += outr;                             \
1193     while (d < de && (*accum << 1) >= inr) {    \
1194       *accum -= inr;                            \
1195       d += bps;                                 \
1196     }                                           \
1197   }                                             \
1198   in_samples -= (sb - se)/bps;                  \
1199   out_samples -= (d - db)/bps;                  \
1200   GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess);     \
1201 } G_STMT_END
1202
1203 #define REV_DOWN_SAMPLES(s,se,d,de)             \
1204 G_STMT_START {                                  \
1205   guint8 *sb = se, *db = d;                     \
1206   while (s <= se && d < de) {                   \
1207     memcpy (d, se, bps);                        \
1208     d += bps;                                   \
1209     *accum += inr;                              \
1210     while (s <= se && (*accum << 1) >= outr) {  \
1211       *accum -= outr;                           \
1212       se -= bps;                                \
1213     }                                           \
1214   }                                             \
1215   in_samples -= (sb - se)/bps;                  \
1216   out_samples -= (d - db)/bps;                  \
1217   GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess);   \
1218 } G_STMT_END
1219
1220
1221 /* our custom commit function because we write into the buffer of pulseaudio
1222  * instead of keeping our own buffer */
1223 static guint
1224 gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
1225     guchar * data, gint in_samples, gint out_samples, gint * accum)
1226 {
1227   GstPulseSink *psink;
1228   GstPulseRingBuffer *pbuf;
1229   guint result;
1230   guint8 *data_end;
1231   gboolean reverse;
1232   gint *toprocess;
1233   gint inr, outr, bps;
1234   gint64 offset;
1235   guint bufsize;
1236
1237   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1238   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1239
1240   /* FIXME post message rather than using a signal (as mixer interface) */
1241   if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) {
1242     g_object_notify (G_OBJECT (psink), "volume");
1243     g_object_notify (G_OBJECT (psink), "mute");
1244   }
1245
1246   /* make sure the ringbuffer is started */
1247   if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
1248           GST_RING_BUFFER_STATE_STARTED)) {
1249     /* see if we are allowed to start it */
1250     if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE))
1251       goto no_start;
1252
1253     GST_DEBUG_OBJECT (buf, "start!");
1254     if (!gst_ring_buffer_start (buf))
1255       goto start_failed;
1256   }
1257
1258   pa_threaded_mainloop_lock (mainloop);
1259   GST_DEBUG_OBJECT (psink, "entering commit");
1260   pbuf->in_commit = TRUE;
1261
1262   bps = buf->spec.bytes_per_sample;
1263   bufsize = buf->spec.segsize * buf->spec.segtotal;
1264
1265   /* our toy resampler for trick modes */
1266   reverse = out_samples < 0;
1267   out_samples = ABS (out_samples);
1268
1269   if (in_samples >= out_samples)
1270     toprocess = &in_samples;
1271   else
1272     toprocess = &out_samples;
1273
1274   inr = in_samples - 1;
1275   outr = out_samples - 1;
1276
1277   GST_DEBUG_OBJECT (psink, "in %d, out %d", inr, outr);
1278
1279   /* data_end points to the last sample we have to write, not past it. This is
1280    * needed to properly handle reverse playback: it points to the last sample. */
1281   data_end = data + (bps * inr);
1282
1283   if (pbuf->paused)
1284     goto was_paused;
1285
1286   /* offset is in bytes */
1287   offset = *sample * bps;
1288
1289   while (*toprocess > 0) {
1290     size_t avail;
1291     guint towrite;
1292
1293     GST_LOG_OBJECT (psink,
1294         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
1295         offset);
1296
1297     if (offset != pbuf->m_lastoffset)
1298       GST_LOG_OBJECT (psink, "discontinuity, offset is %" G_GINT64_FORMAT ", "
1299           "last offset was %" G_GINT64_FORMAT, offset, pbuf->m_lastoffset);
1300
1301     towrite = out_samples * bps;
1302
1303     /* Only ever write segsize bytes at once. This will
1304      * also limit the PA shm buffer to segsize
1305      */
1306     if (towrite > buf->spec.segsize)
1307       towrite = buf->spec.segsize;
1308
1309     if ((pbuf->m_writable < towrite) || (offset != pbuf->m_lastoffset)) {
1310       /* if no room left or discontinuity in offset,
1311          we need to flush data and get a new buffer */
1312
1313       /* flush the buffer if possible */
1314       if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1315
1316         GST_LOG_OBJECT (psink,
1317             "flushing %u samples at offset %" G_GINT64_FORMAT,
1318             (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1319
1320         if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1321                 pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1322           goto write_failed;
1323         }
1324       }
1325       pbuf->m_towrite = 0;
1326       pbuf->m_offset = offset;  /* keep track of current offset */
1327
1328       /* get a buffer to write in for now on */
1329       for (;;) {
1330         pbuf->m_writable = pa_stream_writable_size (pbuf->stream);
1331
1332         if (pbuf->m_writable == (size_t) - 1)
1333           goto writable_size_failed;
1334
1335         pbuf->m_writable /= bps;
1336         pbuf->m_writable *= bps;        /* handle only complete samples */
1337
1338         if (pbuf->m_writable >= towrite)
1339           break;
1340
1341         /* see if we need to uncork because we have no free space */
1342         if (pbuf->corked) {
1343           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1344             goto uncork_failed;
1345         }
1346
1347         /* we can't write a single byte, wait a bit */
1348         GST_LOG_OBJECT (psink, "waiting for free space");
1349         pa_threaded_mainloop_wait (mainloop);
1350
1351         if (pbuf->paused)
1352           goto was_paused;
1353       }
1354
1355       /* make sure we only buffer up latency-time samples */
1356       if (pbuf->m_writable > buf->spec.segsize) {
1357         /* limit buffering to latency-time value */
1358         pbuf->m_writable = buf->spec.segsize;
1359
1360         GST_LOG_OBJECT (psink, "Limiting buffering to %" G_GSIZE_FORMAT,
1361             pbuf->m_writable);
1362       }
1363
1364       GST_LOG_OBJECT (psink, "requesting %" G_GSIZE_FORMAT " bytes of "
1365           "shared memory", pbuf->m_writable);
1366
1367       if (pa_stream_begin_write (pbuf->stream, &pbuf->m_data,
1368               &pbuf->m_writable) < 0) {
1369         GST_LOG_OBJECT (psink, "pa_stream_begin_write() failed");
1370         goto writable_size_failed;
1371       }
1372
1373       GST_LOG_OBJECT (psink, "got %" G_GSIZE_FORMAT " bytes of shared memory",
1374           pbuf->m_writable);
1375
1376       /* Just to make sure that we didn't get more than requested */
1377       if (pbuf->m_writable > buf->spec.segsize) {
1378         /* limit buffering to latency-time value */
1379         pbuf->m_writable = buf->spec.segsize;
1380       }
1381     }
1382
1383     if (pbuf->m_writable < towrite)
1384       towrite = pbuf->m_writable;
1385     avail = towrite / bps;
1386
1387     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1388         (guint) avail, offset);
1389
1390     if (G_LIKELY (inr == outr && !reverse)) {
1391       /* no rate conversion, simply write out the samples */
1392       /* copy the data into internal buffer */
1393
1394       memcpy ((guint8 *) pbuf->m_data + pbuf->m_towrite, data, towrite);
1395       pbuf->m_towrite += towrite;
1396       pbuf->m_writable -= towrite;
1397
1398       data += towrite;
1399       in_samples -= avail;
1400       out_samples -= avail;
1401     } else {
1402       guint8 *dest, *d, *d_end;
1403
1404       /* write into the PulseAudio shm buffer */
1405       dest = d = (guint8 *) pbuf->m_data + pbuf->m_towrite;
1406       d_end = d + towrite;
1407
1408       if (!reverse) {
1409         if (inr >= outr)
1410           /* forward speed up */
1411           FWD_UP_SAMPLES (data, data_end, d, d_end);
1412         else
1413           /* forward slow down */
1414           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1415       } else {
1416         if (inr >= outr)
1417           /* reverse speed up */
1418           REV_UP_SAMPLES (data, data_end, d, d_end);
1419         else
1420           /* reverse slow down */
1421           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1422       }
1423       /* see what we have left to write */
1424       towrite = (d - dest);
1425       pbuf->m_towrite += towrite;
1426       pbuf->m_writable -= towrite;
1427
1428       avail = towrite / bps;
1429     }
1430
1431     /* flush the buffer if it's full */
1432     if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)
1433         && (pbuf->m_writable == 0)) {
1434       GST_LOG_OBJECT (psink, "flushing %u samples at offset %" G_GINT64_FORMAT,
1435           (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1436
1437       if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1438               pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1439         goto write_failed;
1440       }
1441       pbuf->m_towrite = 0;
1442       pbuf->m_offset = offset + towrite;        /* keep track of current offset */
1443     }
1444
1445     *sample += avail;
1446     offset += avail * bps;
1447     pbuf->m_lastoffset = offset;
1448
1449     /* check if we need to uncork after writing the samples */
1450     if (pbuf->corked) {
1451       const pa_timing_info *info;
1452
1453       if ((info = pa_stream_get_timing_info (pbuf->stream))) {
1454         GST_LOG_OBJECT (psink,
1455             "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
1456             info->read_index, offset);
1457
1458         /* we uncork when the read_index is too far behind the offset we need
1459          * to write to. */
1460         if (info->read_index + bufsize <= offset) {
1461           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1462             goto uncork_failed;
1463         }
1464       } else {
1465         GST_LOG_OBJECT (psink, "no timing info available yet");
1466       }
1467     }
1468   }
1469   /* we consumed all samples here */
1470   data = data_end + bps;
1471
1472   pbuf->in_commit = FALSE;
1473   pa_threaded_mainloop_unlock (mainloop);
1474
1475 done:
1476   result = inr - ((data_end - data) / bps);
1477   GST_LOG_OBJECT (psink, "wrote %d samples", result);
1478
1479   return result;
1480
1481   /* ERRORS */
1482 unlock_and_fail:
1483   {
1484     pbuf->in_commit = FALSE;
1485     GST_LOG_OBJECT (psink, "we are reset");
1486     pa_threaded_mainloop_unlock (mainloop);
1487     goto done;
1488   }
1489 no_start:
1490   {
1491     GST_LOG_OBJECT (psink, "we can not start");
1492     return 0;
1493   }
1494 start_failed:
1495   {
1496     GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
1497     return 0;
1498   }
1499 uncork_failed:
1500   {
1501     pbuf->in_commit = FALSE;
1502     GST_ERROR_OBJECT (psink, "uncork failed");
1503     pa_threaded_mainloop_unlock (mainloop);
1504     goto done;
1505   }
1506 was_paused:
1507   {
1508     pbuf->in_commit = FALSE;
1509     GST_LOG_OBJECT (psink, "we are paused");
1510     pa_threaded_mainloop_unlock (mainloop);
1511     goto done;
1512   }
1513 writable_size_failed:
1514   {
1515     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1516         ("pa_stream_writable_size() failed: %s",
1517             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1518     goto unlock_and_fail;
1519   }
1520 write_failed:
1521   {
1522     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1523         ("pa_stream_write() failed: %s",
1524             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1525     goto unlock_and_fail;
1526   }
1527 }
1528
1529 /* write pending local samples, must be called with the mainloop lock */
1530 static void
1531 gst_pulsering_flush (GstPulseRingBuffer * pbuf)
1532 {
1533   GstPulseSink *psink;
1534
1535   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1536   GST_DEBUG_OBJECT (psink, "entering flush");
1537
1538   /* flush the buffer if possible */
1539   if (pbuf->stream && (pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1540 #ifndef GST_DISABLE_GST_DEBUG
1541     gint bps;
1542
1543     bps = (GST_RING_BUFFER_CAST (pbuf))->spec.bytes_per_sample;
1544     GST_LOG_OBJECT (psink,
1545         "flushing %u samples at offset %" G_GINT64_FORMAT,
1546         (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1547 #endif
1548
1549     if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1550             pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1551       goto write_failed;
1552     }
1553
1554     pbuf->m_towrite = 0;
1555     pbuf->m_offset += pbuf->m_towrite;  /* keep track of current offset */
1556   }
1557
1558 done:
1559   return;
1560
1561   /* ERRORS */
1562 write_failed:
1563   {
1564     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1565         ("pa_stream_write() failed: %s",
1566             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1567     goto done;
1568   }
1569 }
1570
1571 static void gst_pulsesink_set_property (GObject * object, guint prop_id,
1572     const GValue * value, GParamSpec * pspec);
1573 static void gst_pulsesink_get_property (GObject * object, guint prop_id,
1574     GValue * value, GParamSpec * pspec);
1575 static void gst_pulsesink_finalize (GObject * object);
1576
1577 static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
1578
1579 static GstStateChangeReturn gst_pulsesink_change_state (GstElement * element,
1580     GstStateChange transition);
1581
1582 static void gst_pulsesink_init_interfaces (GType type);
1583
1584 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
1585 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
1586 #else
1587 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
1588 #endif
1589
1590 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
1591
1592 #define _do_init(type) \
1593   gst_pulsesink_init_contexts (); \
1594   gst_pulsesink_init_interfaces (type);
1595
1596 GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
1597     GST_TYPE_BASE_AUDIO_SINK, _do_init);
1598
1599 static gboolean
1600 gst_pulsesink_interface_supported (GstImplementsInterface *
1601     iface, GType interface_type)
1602 {
1603   GstPulseSink *this = GST_PULSESINK_CAST (iface);
1604
1605   if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
1606     return TRUE;
1607   if (interface_type == GST_TYPE_STREAM_VOLUME)
1608     return TRUE;
1609
1610   return FALSE;
1611 }
1612
1613 static void
1614 gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass)
1615 {
1616   klass->supported = gst_pulsesink_interface_supported;
1617 }
1618
1619 static void
1620 gst_pulsesink_init_interfaces (GType type)
1621 {
1622   static const GInterfaceInfo implements_iface_info = {
1623     (GInterfaceInitFunc) gst_pulsesink_implements_interface_init,
1624     NULL,
1625     NULL,
1626   };
1627   static const GInterfaceInfo probe_iface_info = {
1628     (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init,
1629     NULL,
1630     NULL,
1631   };
1632   static const GInterfaceInfo svol_iface_info = {
1633     NULL, NULL, NULL
1634   };
1635
1636   g_type_add_interface_static (type, GST_TYPE_STREAM_VOLUME, &svol_iface_info);
1637   g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE,
1638       &implements_iface_info);
1639   g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE,
1640       &probe_iface_info);
1641 }
1642
1643 static void
1644 gst_pulsesink_base_init (gpointer g_class)
1645 {
1646   static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
1647       GST_PAD_SINK,
1648       GST_PAD_ALWAYS,
1649       GST_STATIC_CAPS ("audio/x-raw-int, "
1650           "endianness = (int) { " ENDIANNESS " }, "
1651           "signed = (boolean) TRUE, "
1652           "width = (int) 16, "
1653           "depth = (int) 16, "
1654           "rate = (int) [ 1, MAX ], "
1655           "channels = (int) [ 1, 32 ];"
1656           "audio/x-raw-float, "
1657           "endianness = (int) { " ENDIANNESS " }, "
1658           "width = (int) 32, "
1659           "rate = (int) [ 1, MAX ], "
1660           "channels = (int) [ 1, 32 ];"
1661           "audio/x-raw-int, "
1662           "endianness = (int) { " ENDIANNESS " }, "
1663           "signed = (boolean) TRUE, "
1664           "width = (int) 32, "
1665           "depth = (int) 32, "
1666           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1667           "audio/x-raw-int, "
1668           "endianness = (int) { " ENDIANNESS " }, "
1669           "signed = (boolean) TRUE, "
1670           "width = (int) 24, "
1671           "depth = (int) 24, "
1672           "rate = (int) [ 1, MAX ], "
1673           "channels = (int) [ 1, 32 ];"
1674           "audio/x-raw-int, "
1675           "endianness = (int) { " ENDIANNESS " }, "
1676           "signed = (boolean) TRUE, "
1677           "width = (int) 32, "
1678           "depth = (int) 24, "
1679           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1680           "audio/x-raw-int, "
1681           "signed = (boolean) FALSE, "
1682           "width = (int) 8, "
1683           "depth = (int) 8, "
1684           "rate = (int) [ 1, MAX ], "
1685           "channels = (int) [ 1, 32 ];"
1686           "audio/x-alaw, "
1687           "rate = (int) [ 1, MAX], "
1688           "channels = (int) [ 1, 32 ];"
1689           "audio/x-mulaw, "
1690           "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
1691       );
1692
1693   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
1694
1695   gst_element_class_set_details_simple (element_class,
1696       "PulseAudio Audio Sink",
1697       "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering");
1698   gst_element_class_add_pad_template (element_class,
1699       gst_static_pad_template_get (&pad_template));
1700 }
1701
1702 static GstRingBuffer *
1703 gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
1704 {
1705   GstRingBuffer *buffer;
1706
1707   GST_DEBUG_OBJECT (sink, "creating ringbuffer");
1708   buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
1709   GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
1710
1711   return buffer;
1712 }
1713
1714 static void
1715 gst_pulsesink_class_init (GstPulseSinkClass * klass)
1716 {
1717   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1718   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
1719   GstBaseSinkClass *bc;
1720   GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
1721   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
1722
1723   gobject_class->finalize = gst_pulsesink_finalize;
1724   gobject_class->set_property = gst_pulsesink_set_property;
1725   gobject_class->get_property = gst_pulsesink_get_property;
1726
1727   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
1728
1729   /* restore the original basesink pull methods */
1730   bc = g_type_class_peek (GST_TYPE_BASE_SINK);
1731   gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull);
1732
1733   gstelement_class->change_state =
1734       GST_DEBUG_FUNCPTR (gst_pulsesink_change_state);
1735
1736   gstaudiosink_class->create_ringbuffer =
1737       GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
1738
1739   /* Overwrite GObject fields */
1740   g_object_class_install_property (gobject_class,
1741       PROP_SERVER,
1742       g_param_spec_string ("server", "Server",
1743           "The PulseAudio server to connect to", DEFAULT_SERVER,
1744           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1745
1746   g_object_class_install_property (gobject_class, PROP_DEVICE,
1747       g_param_spec_string ("device", "Device",
1748           "The PulseAudio sink device to connect to", DEFAULT_DEVICE,
1749           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1750
1751   g_object_class_install_property (gobject_class,
1752       PROP_DEVICE_NAME,
1753       g_param_spec_string ("device-name", "Device name",
1754           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
1755           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1756
1757   g_object_class_install_property (gobject_class,
1758       PROP_VOLUME,
1759       g_param_spec_double ("volume", "Volume",
1760           "Linear volume of this stream, 1.0=100%", 0.0, MAX_VOLUME,
1761           DEFAULT_VOLUME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1762   g_object_class_install_property (gobject_class,
1763       PROP_MUTE,
1764       g_param_spec_boolean ("mute", "Mute",
1765           "Mute state of this stream", DEFAULT_MUTE,
1766           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1767
1768   /**
1769    * GstPulseSink:client
1770    *
1771    * The PulseAudio client name to use.
1772    *
1773    * Since: 0.10.25
1774    */
1775   g_object_class_install_property (gobject_class,
1776       PROP_CLIENT,
1777       g_param_spec_string ("client", "Client",
1778           "The PulseAudio client name to use", gst_pulse_client_name (),
1779           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1780           GST_PARAM_MUTABLE_READY));
1781
1782   /**
1783    * GstPulseSink:stream-properties
1784    *
1785    * List of pulseaudio stream properties. A list of defined properties can be
1786    * found in the <ulink url="http://0pointer.de/lennart/projects/pulseaudio/doxygen/proplist_8h.html">pulseaudio api docs</ulink>.
1787    *
1788    * Below is an example for registering as a music application to pulseaudio.
1789    * |[
1790    * GstStructure *props;
1791    *
1792    * props = gst_structure_from_string ("props,media.role=music", NULL);
1793    * g_object_set (pulse, "stream-properties", props, NULL);
1794    * gst_structure_free
1795    * ]|
1796    *
1797    * Since: 0.10.26
1798    */
1799   g_object_class_install_property (gobject_class,
1800       PROP_STREAM_PROPERTIES,
1801       g_param_spec_boxed ("stream-properties", "stream properties",
1802           "list of pulseaudio stream properties",
1803           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1804 }
1805
1806 /* returns the current time of the sink ringbuffer */
1807 static GstClockTime
1808 gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink)
1809 {
1810   GstPulseSink *psink;
1811   GstPulseRingBuffer *pbuf;
1812   pa_usec_t time;
1813
1814   if (!sink->ringbuffer || !sink->ringbuffer->acquired)
1815     return GST_CLOCK_TIME_NONE;
1816
1817   pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
1818   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1819
1820   pa_threaded_mainloop_lock (mainloop);
1821   if (gst_pulsering_is_dead (psink, pbuf, TRUE))
1822     goto server_dead;
1823
1824   /* if we don't have enough data to get a timestamp, just return NONE, which
1825    * will return the last reported time */
1826   if (pa_stream_get_time (pbuf->stream, &time) < 0) {
1827     GST_DEBUG_OBJECT (psink, "could not get time");
1828     time = GST_CLOCK_TIME_NONE;
1829   } else
1830     time *= 1000;
1831   pa_threaded_mainloop_unlock (mainloop);
1832
1833   GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
1834       GST_TIME_ARGS (time));
1835
1836   return time;
1837
1838   /* ERRORS */
1839 server_dead:
1840   {
1841     GST_DEBUG_OBJECT (psink, "the server is dead");
1842     pa_threaded_mainloop_unlock (mainloop);
1843
1844     return GST_CLOCK_TIME_NONE;
1845   }
1846 }
1847
1848 static void
1849 gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
1850 {
1851   pulsesink->server = NULL;
1852   pulsesink->device = NULL;
1853   pulsesink->device_description = NULL;
1854   pulsesink->client_name = gst_pulse_client_name ();
1855
1856   pulsesink->volume = DEFAULT_VOLUME;
1857   pulsesink->volume_set = FALSE;
1858
1859   pulsesink->mute = DEFAULT_MUTE;
1860   pulsesink->mute_set = FALSE;
1861
1862   pulsesink->notify = 0;
1863
1864   pulsesink->properties = NULL;
1865   pulsesink->proplist = NULL;
1866
1867   /* override with a custom clock */
1868   if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
1869     gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
1870
1871   GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
1872       gst_audio_clock_new ("GstPulseSinkClock",
1873       (GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink);
1874
1875   /* TRUE for sinks, FALSE for sources */
1876   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
1877       G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
1878       TRUE, FALSE);
1879 }
1880
1881 static void
1882 gst_pulsesink_finalize (GObject * object)
1883 {
1884   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1885
1886   g_free (pulsesink->server);
1887   g_free (pulsesink->device);
1888   g_free (pulsesink->device_description);
1889   g_free (pulsesink->client_name);
1890
1891   if (pulsesink->properties)
1892     gst_structure_free (pulsesink->properties);
1893   if (pulsesink->proplist)
1894     pa_proplist_free (pulsesink->proplist);
1895
1896   if (pulsesink->probe) {
1897     gst_pulseprobe_free (pulsesink->probe);
1898     pulsesink->probe = NULL;
1899   }
1900
1901   G_OBJECT_CLASS (parent_class)->finalize (object);
1902 }
1903
1904 static void
1905 gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
1906 {
1907   pa_cvolume v;
1908   pa_operation *o = NULL;
1909   GstPulseRingBuffer *pbuf;
1910   uint32_t idx;
1911
1912   if (!mainloop)
1913     goto no_mainloop;
1914
1915   pa_threaded_mainloop_lock (mainloop);
1916
1917   GST_DEBUG_OBJECT (psink, "setting volume to %f", volume);
1918
1919   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1920   if (pbuf == NULL || pbuf->stream == NULL)
1921     goto no_buffer;
1922
1923   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1924     goto no_index;
1925
1926   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
1927
1928   if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx,
1929               &v, NULL, NULL)))
1930     goto volume_failed;
1931
1932   /* We don't really care about the result of this call */
1933 unlock:
1934
1935   if (o)
1936     pa_operation_unref (o);
1937
1938   pa_threaded_mainloop_unlock (mainloop);
1939
1940   return;
1941
1942   /* ERRORS */
1943 no_mainloop:
1944   {
1945     psink->volume = volume;
1946     psink->volume_set = TRUE;
1947
1948     GST_DEBUG_OBJECT (psink, "we have no mainloop");
1949     return;
1950   }
1951 no_buffer:
1952   {
1953     psink->volume = volume;
1954     psink->volume_set = TRUE;
1955
1956     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1957     goto unlock;
1958   }
1959 no_index:
1960   {
1961     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1962     goto unlock;
1963   }
1964 volume_failed:
1965   {
1966     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1967         ("pa_stream_set_sink_input_volume() failed: %s",
1968             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1969     goto unlock;
1970   }
1971 }
1972
1973 static void
1974 gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
1975 {
1976   pa_operation *o = NULL;
1977   GstPulseRingBuffer *pbuf;
1978   uint32_t idx;
1979
1980   if (!mainloop)
1981     goto no_mainloop;
1982
1983   pa_threaded_mainloop_lock (mainloop);
1984
1985   GST_DEBUG_OBJECT (psink, "setting mute state to %d", mute);
1986
1987   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1988   if (pbuf == NULL || pbuf->stream == NULL)
1989     goto no_buffer;
1990
1991   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1992     goto no_index;
1993
1994   if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx,
1995               mute, NULL, NULL)))
1996     goto mute_failed;
1997
1998   /* We don't really care about the result of this call */
1999 unlock:
2000
2001   if (o)
2002     pa_operation_unref (o);
2003
2004   pa_threaded_mainloop_unlock (mainloop);
2005
2006   return;
2007
2008   /* ERRORS */
2009 no_mainloop:
2010   {
2011     psink->mute = mute;
2012     psink->mute_set = TRUE;
2013
2014     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2015     return;
2016   }
2017 no_buffer:
2018   {
2019     psink->mute = mute;
2020     psink->mute_set = TRUE;
2021
2022     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2023     goto unlock;
2024   }
2025 no_index:
2026   {
2027     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2028     goto unlock;
2029   }
2030 mute_failed:
2031   {
2032     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2033         ("pa_stream_set_sink_input_mute() failed: %s",
2034             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2035     goto unlock;
2036   }
2037 }
2038
2039 static void
2040 gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
2041     int eol, void *userdata)
2042 {
2043   GstPulseRingBuffer *pbuf;
2044   GstPulseSink *psink;
2045
2046   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2047   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2048
2049   if (!i)
2050     goto done;
2051
2052   if (!pbuf->stream)
2053     goto done;
2054
2055   /* If the index doesn't match our current stream,
2056    * it implies we just recreated the stream (caps change)
2057    */
2058   if (i->index == pa_stream_get_index (pbuf->stream)) {
2059     psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
2060     psink->mute = i->mute;
2061   }
2062
2063 done:
2064   pa_threaded_mainloop_signal (mainloop, 0);
2065 }
2066
2067 static gdouble
2068 gst_pulsesink_get_volume (GstPulseSink * psink)
2069 {
2070   GstPulseRingBuffer *pbuf;
2071   pa_operation *o = NULL;
2072   gdouble v = DEFAULT_VOLUME;
2073   uint32_t idx;
2074
2075   if (!mainloop)
2076     goto no_mainloop;
2077
2078   pa_threaded_mainloop_lock (mainloop);
2079
2080   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2081   if (pbuf == NULL || pbuf->stream == NULL)
2082     goto no_buffer;
2083
2084   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2085     goto no_index;
2086
2087   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
2088               gst_pulsesink_sink_input_info_cb, pbuf)))
2089     goto info_failed;
2090
2091   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2092     pa_threaded_mainloop_wait (mainloop);
2093     if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2094       goto unlock;
2095   }
2096
2097 unlock:
2098   v = psink->volume;
2099
2100   if (o)
2101     pa_operation_unref (o);
2102
2103   pa_threaded_mainloop_unlock (mainloop);
2104
2105   if (v > MAX_VOLUME) {
2106     GST_WARNING_OBJECT (psink, "Clipped volume from %f to %f", v, MAX_VOLUME);
2107     v = MAX_VOLUME;
2108   }
2109
2110   return v;
2111
2112   /* ERRORS */
2113 no_mainloop:
2114   {
2115     v = psink->volume;
2116     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2117     return v;
2118   }
2119 no_buffer:
2120   {
2121     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2122     goto unlock;
2123   }
2124 no_index:
2125   {
2126     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2127     goto unlock;
2128   }
2129 info_failed:
2130   {
2131     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2132         ("pa_context_get_sink_input_info() failed: %s",
2133             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2134     goto unlock;
2135   }
2136 }
2137
2138 static gboolean
2139 gst_pulsesink_get_mute (GstPulseSink * psink)
2140 {
2141   GstPulseRingBuffer *pbuf;
2142   pa_operation *o = NULL;
2143   uint32_t idx;
2144   gboolean mute = FALSE;
2145
2146   if (!mainloop)
2147     goto no_mainloop;
2148
2149   pa_threaded_mainloop_lock (mainloop);
2150   mute = psink->mute;
2151
2152   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2153   if (pbuf == NULL || pbuf->stream == NULL)
2154     goto no_buffer;
2155
2156   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2157     goto no_index;
2158
2159   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
2160               gst_pulsesink_sink_input_info_cb, pbuf)))
2161     goto info_failed;
2162
2163   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2164     pa_threaded_mainloop_wait (mainloop);
2165     if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2166       goto unlock;
2167   }
2168
2169 unlock:
2170   if (o)
2171     pa_operation_unref (o);
2172
2173   pa_threaded_mainloop_unlock (mainloop);
2174
2175   return mute;
2176
2177   /* ERRORS */
2178 no_mainloop:
2179   {
2180     mute = psink->mute;
2181     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2182     return mute;
2183   }
2184 no_buffer:
2185   {
2186     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2187     goto unlock;
2188   }
2189 no_index:
2190   {
2191     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2192     goto unlock;
2193   }
2194 info_failed:
2195   {
2196     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2197         ("pa_context_get_sink_input_info() failed: %s",
2198             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2199     goto unlock;
2200   }
2201 }
2202
2203 static void
2204 gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
2205     void *userdata)
2206 {
2207   GstPulseRingBuffer *pbuf;
2208   GstPulseSink *psink;
2209
2210   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2211   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2212
2213   if (!i)
2214     goto done;
2215
2216   g_free (psink->device_description);
2217   psink->device_description = g_strdup (i->description);
2218
2219 done:
2220   pa_threaded_mainloop_signal (mainloop, 0);
2221 }
2222
2223 static gchar *
2224 gst_pulsesink_device_description (GstPulseSink * psink)
2225 {
2226   GstPulseRingBuffer *pbuf;
2227   pa_operation *o = NULL;
2228   gchar *t;
2229
2230   if (!mainloop)
2231     goto no_mainloop;
2232
2233   pa_threaded_mainloop_lock (mainloop);
2234   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2235   if (pbuf == NULL)
2236     goto no_buffer;
2237
2238   if (!(o = pa_context_get_sink_info_by_name (pbuf->context,
2239               psink->device, gst_pulsesink_sink_info_cb, pbuf)))
2240     goto info_failed;
2241
2242   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2243     pa_threaded_mainloop_wait (mainloop);
2244     if (gst_pulsering_is_dead (psink, pbuf, FALSE))
2245       goto unlock;
2246   }
2247
2248 unlock:
2249   if (o)
2250     pa_operation_unref (o);
2251
2252   t = g_strdup (psink->device_description);
2253   pa_threaded_mainloop_unlock (mainloop);
2254
2255   return t;
2256
2257   /* ERRORS */
2258 no_mainloop:
2259   {
2260     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2261     return NULL;
2262   }
2263 no_buffer:
2264   {
2265     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2266     goto unlock;
2267   }
2268 info_failed:
2269   {
2270     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2271         ("pa_context_get_sink_info_by_index() failed: %s",
2272             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2273     goto unlock;
2274   }
2275 }
2276
2277 static void
2278 gst_pulsesink_set_property (GObject * object,
2279     guint prop_id, const GValue * value, GParamSpec * pspec)
2280 {
2281   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2282
2283   switch (prop_id) {
2284     case PROP_SERVER:
2285       g_free (pulsesink->server);
2286       pulsesink->server = g_value_dup_string (value);
2287       if (pulsesink->probe)
2288         gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
2289       break;
2290     case PROP_DEVICE:
2291       g_free (pulsesink->device);
2292       pulsesink->device = g_value_dup_string (value);
2293       break;
2294     case PROP_VOLUME:
2295       gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
2296       break;
2297     case PROP_MUTE:
2298       gst_pulsesink_set_mute (pulsesink, g_value_get_boolean (value));
2299       break;
2300     case PROP_CLIENT:
2301       g_free (pulsesink->client_name);
2302       if (!g_value_get_string (value)) {
2303         GST_WARNING_OBJECT (pulsesink,
2304             "Empty PulseAudio client name not allowed. Resetting to default value");
2305         pulsesink->client_name = gst_pulse_client_name ();
2306       } else
2307         pulsesink->client_name = g_value_dup_string (value);
2308       break;
2309     case PROP_STREAM_PROPERTIES:
2310       if (pulsesink->properties)
2311         gst_structure_free (pulsesink->properties);
2312       pulsesink->properties =
2313           gst_structure_copy (gst_value_get_structure (value));
2314       if (pulsesink->proplist)
2315         pa_proplist_free (pulsesink->proplist);
2316       pulsesink->proplist = gst_pulse_make_proplist (pulsesink->properties);
2317       break;
2318     default:
2319       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2320       break;
2321   }
2322 }
2323
2324 static void
2325 gst_pulsesink_get_property (GObject * object,
2326     guint prop_id, GValue * value, GParamSpec * pspec)
2327 {
2328
2329   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2330
2331   switch (prop_id) {
2332     case PROP_SERVER:
2333       g_value_set_string (value, pulsesink->server);
2334       break;
2335     case PROP_DEVICE:
2336       g_value_set_string (value, pulsesink->device);
2337       break;
2338     case PROP_DEVICE_NAME:
2339       g_value_take_string (value, gst_pulsesink_device_description (pulsesink));
2340       break;
2341     case PROP_VOLUME:
2342       g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
2343       break;
2344     case PROP_MUTE:
2345       g_value_set_boolean (value, gst_pulsesink_get_mute (pulsesink));
2346       break;
2347     case PROP_CLIENT:
2348       g_value_set_string (value, pulsesink->client_name);
2349       break;
2350     case PROP_STREAM_PROPERTIES:
2351       gst_value_set_structure (value, pulsesink->properties);
2352       break;
2353     default:
2354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2355       break;
2356   }
2357 }
2358
2359 static void
2360 gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
2361 {
2362   pa_operation *o = NULL;
2363   GstPulseRingBuffer *pbuf;
2364
2365   pa_threaded_mainloop_lock (mainloop);
2366
2367   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2368
2369   if (pbuf == NULL || pbuf->stream == NULL)
2370     goto no_buffer;
2371
2372   g_free (pbuf->stream_name);
2373   pbuf->stream_name = g_strdup (t);
2374
2375   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
2376     goto name_failed;
2377
2378   /* We're not interested if this operation failed or not */
2379 unlock:
2380
2381   if (o)
2382     pa_operation_unref (o);
2383   pa_threaded_mainloop_unlock (mainloop);
2384
2385   return;
2386
2387   /* ERRORS */
2388 no_buffer:
2389   {
2390     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2391     goto unlock;
2392   }
2393 name_failed:
2394   {
2395     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2396         ("pa_stream_set_name() failed: %s",
2397             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2398     goto unlock;
2399   }
2400 }
2401
2402 static void
2403 gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
2404 {
2405   static const gchar *const map[] = {
2406     GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
2407
2408     /* might get overriden in the next iteration by GST_TAG_ARTIST */
2409     GST_TAG_PERFORMER, PA_PROP_MEDIA_ARTIST,
2410
2411     GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
2412     GST_TAG_LANGUAGE_CODE, PA_PROP_MEDIA_LANGUAGE,
2413     GST_TAG_LOCATION, PA_PROP_MEDIA_FILENAME,
2414     /* We might add more here later on ... */
2415     NULL
2416   };
2417   pa_proplist *pl = NULL;
2418   const gchar *const *t;
2419   gboolean empty = TRUE;
2420   pa_operation *o = NULL;
2421   GstPulseRingBuffer *pbuf;
2422
2423   pl = pa_proplist_new ();
2424
2425   for (t = map; *t; t += 2) {
2426     gchar *n = NULL;
2427
2428     if (gst_tag_list_get_string (l, *t, &n)) {
2429
2430       if (n && *n) {
2431         pa_proplist_sets (pl, *(t + 1), n);
2432         empty = FALSE;
2433       }
2434
2435       g_free (n);
2436     }
2437   }
2438   if (empty)
2439     goto finish;
2440
2441   pa_threaded_mainloop_lock (mainloop);
2442   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2443   if (pbuf == NULL || pbuf->stream == NULL)
2444     goto no_buffer;
2445
2446   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
2447               pl, NULL, NULL)))
2448     goto update_failed;
2449
2450   /* We're not interested if this operation failed or not */
2451 unlock:
2452
2453   if (o)
2454     pa_operation_unref (o);
2455
2456   pa_threaded_mainloop_unlock (mainloop);
2457
2458 finish:
2459
2460   if (pl)
2461     pa_proplist_free (pl);
2462
2463   return;
2464
2465   /* ERRORS */
2466 no_buffer:
2467   {
2468     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2469     goto unlock;
2470   }
2471 update_failed:
2472   {
2473     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2474         ("pa_stream_proplist_update() failed: %s",
2475             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2476     goto unlock;
2477   }
2478 }
2479
2480 static void
2481 gst_pulsesink_flush_ringbuffer (GstPulseSink * psink)
2482 {
2483   GstPulseRingBuffer *pbuf;
2484
2485   pa_threaded_mainloop_lock (mainloop);
2486
2487   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2488
2489   if (pbuf == NULL || pbuf->stream == NULL)
2490     goto no_buffer;
2491
2492   gst_pulsering_flush (pbuf);
2493
2494   /* Uncork if we haven't already (happens when waiting to get enough data
2495    * to send out the first time) */
2496   if (pbuf->corked)
2497     gst_pulsering_set_corked (pbuf, FALSE, FALSE);
2498
2499   /* We're not interested if this operation failed or not */
2500 unlock:
2501   pa_threaded_mainloop_unlock (mainloop);
2502
2503   return;
2504
2505   /* ERRORS */
2506 no_buffer:
2507   {
2508     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2509     goto unlock;
2510   }
2511 }
2512
2513 static gboolean
2514 gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
2515 {
2516   GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
2517
2518   switch (GST_EVENT_TYPE (event)) {
2519     case GST_EVENT_TAG:{
2520       gchar *title = NULL, *artist = NULL, *location = NULL, *description =
2521           NULL, *t = NULL, *buf = NULL;
2522       GstTagList *l;
2523
2524       gst_event_parse_tag (event, &l);
2525
2526       gst_tag_list_get_string (l, GST_TAG_TITLE, &title);
2527       gst_tag_list_get_string (l, GST_TAG_ARTIST, &artist);
2528       gst_tag_list_get_string (l, GST_TAG_LOCATION, &location);
2529       gst_tag_list_get_string (l, GST_TAG_DESCRIPTION, &description);
2530
2531       if (!artist)
2532         gst_tag_list_get_string (l, GST_TAG_PERFORMER, &artist);
2533
2534       if (title && artist)
2535         /* TRANSLATORS: 'song title' by 'artist name' */
2536         t = buf = g_strdup_printf (_("'%s' by '%s'"), g_strstrip (title),
2537             g_strstrip (artist));
2538       else if (title)
2539         t = g_strstrip (title);
2540       else if (description)
2541         t = g_strstrip (description);
2542       else if (location)
2543         t = g_strstrip (location);
2544
2545       if (t)
2546         gst_pulsesink_change_title (pulsesink, t);
2547
2548       g_free (title);
2549       g_free (artist);
2550       g_free (location);
2551       g_free (description);
2552       g_free (buf);
2553
2554       gst_pulsesink_change_props (pulsesink, l);
2555
2556       break;
2557     }
2558     case GST_EVENT_EOS:
2559       gst_pulsesink_flush_ringbuffer (pulsesink);
2560       break;
2561     default:
2562       ;
2563   }
2564
2565   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
2566 }
2567
2568 static GstStateChangeReturn
2569 gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
2570 {
2571   GstPulseSink *pulsesink = GST_PULSESINK (element);
2572   GstStateChangeReturn ret;
2573
2574   switch (transition) {
2575     case GST_STATE_CHANGE_NULL_TO_READY:
2576       g_mutex_lock (pa_shared_resource_mutex);
2577       if (!mainloop_ref_ct) {
2578         GST_INFO_OBJECT (element, "new pa main loop thread");
2579         if (!(mainloop = pa_threaded_mainloop_new ()))
2580           goto mainloop_failed;
2581         mainloop_ref_ct = 1;
2582         pa_threaded_mainloop_start (mainloop);
2583         g_mutex_unlock (pa_shared_resource_mutex);
2584       } else {
2585         GST_INFO_OBJECT (element, "reusing pa main loop thread");
2586         mainloop_ref_ct++;
2587         g_mutex_unlock (pa_shared_resource_mutex);
2588       }
2589       break;
2590     case GST_STATE_CHANGE_READY_TO_PAUSED:
2591       gst_element_post_message (element,
2592           gst_message_new_clock_provide (GST_OBJECT_CAST (element),
2593               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock, TRUE));
2594       break;
2595     default:
2596       break;
2597   }
2598
2599   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2600   if (ret == GST_STATE_CHANGE_FAILURE)
2601     goto state_failure;
2602
2603   switch (transition) {
2604     case GST_STATE_CHANGE_PAUSED_TO_READY:
2605       gst_element_post_message (element,
2606           gst_message_new_clock_lost (GST_OBJECT_CAST (element),
2607               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock));
2608       break;
2609     case GST_STATE_CHANGE_READY_TO_NULL:
2610       if (mainloop) {
2611         g_mutex_lock (pa_shared_resource_mutex);
2612         mainloop_ref_ct--;
2613         if (!mainloop_ref_ct) {
2614           GST_INFO_OBJECT (element, "terminating pa main loop thread");
2615           pa_threaded_mainloop_stop (mainloop);
2616           pa_threaded_mainloop_free (mainloop);
2617           mainloop = NULL;
2618         }
2619         g_mutex_unlock (pa_shared_resource_mutex);
2620       }
2621       break;
2622     default:
2623       break;
2624   }
2625
2626   return ret;
2627
2628   /* ERRORS */
2629 mainloop_failed:
2630   {
2631     g_mutex_unlock (pa_shared_resource_mutex);
2632     GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
2633         ("pa_threaded_mainloop_new() failed"), (NULL));
2634     return GST_STATE_CHANGE_FAILURE;
2635   }
2636 state_failure:
2637   {
2638     if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
2639       /* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */
2640       g_assert (mainloop);
2641       g_mutex_lock (pa_shared_resource_mutex);
2642       mainloop_ref_ct--;
2643       if (!mainloop_ref_ct) {
2644         GST_INFO_OBJECT (element, "terminating pa main loop thread");
2645         pa_threaded_mainloop_stop (mainloop);
2646         pa_threaded_mainloop_free (mainloop);
2647         mainloop = NULL;
2648       }
2649       g_mutex_unlock (pa_shared_resource_mutex);
2650     }
2651     return ret;
2652   }
2653 }