prevent thread leakage.
[aox:aox.git] / server / eventloop.cpp
1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2
3 #include "eventloop.h"
4
5 #include "connection.h"
6 #include "allocator.h"
7 #include "buffer.h"
8 #include "estring.h"
9 #include "server.h"
10 #include "scope.h"
11 #include "timer.h"
12 #include "graph.h"
13 #include "event.h"
14 #include "list.h"
15 #include "log.h"
16
17 // time
18 #include <time.h>
19 // errno
20 #include <errno.h>
21 // struct timeval, fd_set
22 #include <sys/time.h>
23 #include <sys/types.h>
24 #include <sys/select.h>
25 // getsockopt, SOL_SOCKET, SO_ERROR
26 #include <sys/socket.h>
27 // read, select
28 #include <unistd.h>
29 // ioctl, FIONREAD
30 #include <sys/ioctl.h>
31
32 // memset (for FD_* under OpenBSD)
33 #include <string.h>
34
35
36 static bool freeMemorySoon;
37
38
39 static EventLoop * loop;
40
41
42 class LoopData
43     : public Garbage
44 {
45 public:
46     LoopData()
47         : log( new Log ), startup( false ),
48           stop( false ), limit( 16 * 1024 * 1024 )
49     {}
50
51     Log *log;
52     bool startup;
53     bool stop;
54     List< Connection > connections;
55     List< Timer > timers;
56     uint limit;
57
58     class Stopper
59         : public EventHandler
60     {
61     public:
62         Stopper( uint s ): stage2( false ) {
63             (void)new Timer( this, s );
64             if ( s <= 10 )
65                 stage2 = true;
66         }
67         void execute() {
68             if ( !EventLoop::global() || EventLoop::global()->inShutdown() )
69                 return;
70             if ( stage2 )
71                 EventLoop::global()->stop();
72             else
73                 EventLoop::global()->stop( 10 );
74         }
75         bool stage2;
76     };
77 };
78
79
80 /*! \class EventLoop eventloop.h
81     This class dispatches event notifications to a list of Connections.
82
83     An EventLoop maintains a list of participating Connection objects,
84     and periodically informs them about any events (e.g., read/write,
85     errors, timeouts) that occur. The loop continues until something
86     calls stop().
87 */
88
89
90 /*! Creates the global EventLoop object or, if \a l is non-zero, sets
91     the global EventLoop to \a l. This function expects to be called
92     very early during the startup sequence.
93 */
94
95 void EventLoop::setup( EventLoop * l )
96 {
97     ::loop = l;
98     if ( !l )
99         ::loop = new EventLoop;
100     Allocator::addEternal( ::loop, "global event loop" );
101 }
102
103
104 /*! Creates a new EventLoop. */
105
106 EventLoop::EventLoop()
107     : d( new LoopData )
108 {
109 }
110
111
112 /*! Exists only to avoid compiler warnings. */
113
114 EventLoop::~EventLoop()
115 {
116 }
117
118
119 /*! Adds \a c to this EventLoop's list of active Connections.
120
121     If shutdown() has been called already, addConnection() ignores \a
122     c, so that shutdown proceeds unhampered. This is likely to disturb
123     \a c a little, but it's better than the alternative: Aborting the
124     shutdown.
125 */
126
127 void EventLoop::addConnection( Connection * c )
128 {
129     if ( d->stop ) {
130         log( "Cannot add new Connection objects during shutdown",
131              Log::Error );
132         return;
133     }
134
135     Scope x( d->log );
136
137     if ( d->connections.find( c ) )
138         return;
139
140     d->connections.prepend( c );
141     setConnectionCounts();
142 }
143
144
145 /*! Removes \a c from this EventLoop's list of active
146     Connections.
147 */
148
149 void EventLoop::removeConnection( Connection * c )
150 {
151     Scope x( d->log );
152
153     if ( d->connections.remove( c ) == 0 )
154         return;
155     setConnectionCounts();
156
157     // if this is a server, with external connections, and we just
158     // closed the last external connection, then we shut down
159     // nicely. otherwise, we just remove the specified connection,
160     // with no magic.
161
162     if ( c->hasProperty( Connection::Internal ) )
163         return;
164
165     if ( d->stop )
166         return;
167
168     List< Connection >::Iterator it( d->connections );
169     while ( it ) {
170         if ( !it->hasProperty( Connection::Internal ) )
171             return;
172         ++it;
173     }
174     stop( 2 );
175 }
176
177
178 /*! Returns a (non-zero) pointer to the list of Connections that have
179     been added to this EventLoop.
180 */
181
182 List< Connection > *EventLoop::connections() const
183 {
184     return &d->connections;
185 }
186
187
188 static GraphableNumber * sizeinram = 0;
189
190 static const uint gcDelay = 30;
191
192
193 /*! Starts the EventLoop and runs it until stop() is called. */
194
195 void EventLoop::start()
196 {
197     Scope x( d->log );
198     time_t gc = time(0);
199     bool haveLoggedStartup = false;
200
201     log( "Starting event loop", Log::Debug );
202
203     while ( !d->stop && !Log::disastersYet() ) {
204         if ( !haveLoggedStartup && !inStartup() ) {
205             if ( !Server::name().isEmpty() )
206                 log( Server::name() + ": Server startup complete",
207                      Log::Significant );
208             haveLoggedStartup = true;
209         }
210
211         Connection * c;
212
213         uint timeout = gcDelay;
214         int maxfd = -1;
215
216         fd_set r, w;
217         FD_ZERO( &r );
218         FD_ZERO( &w );
219
220         // Figure out what events each connection wants.
221
222         List< Connection >::Iterator it( d->connections );
223         while ( it ) {
224             c = it;
225             ++it;
226
227             int fd = c->fd();
228             if ( fd < 0 ) {
229                 removeConnection( c );
230             }
231             else if ( c->type() == Connection::Listener && inStartup() ) {
232                 // we don't accept new connections until we've
233                 // completed startup
234             }
235             else {
236                 if ( fd > maxfd )
237                     maxfd = fd;
238                 FD_SET( fd, &r );
239                 if ( c->canWrite() ||
240                      c->state() == Connection::Connecting ||
241                      c->state() == Connection::Closing )
242                     FD_SET( fd, &w );
243                 if ( c->timeout() > 0 && c->timeout() < timeout )
244                     timeout = c->timeout();
245             }
246         }
247
248         // Figure out whether any timers need attention soon
249
250         List< Timer >::Iterator t( d->timers );
251         while ( t ) {
252             if ( t->active() && t->timeout() < timeout )
253                 timeout = t->timeout();
254             ++t;
255         }
256
257         // Look for interesting input
258
259         struct timeval tv;
260         tv.tv_sec = timeout - time( 0 );
261         tv.tv_usec = 0;
262
263         if ( tv.tv_sec < 0 )
264             tv.tv_sec = 0;
265         if ( tv.tv_sec > 60 )
266             tv.tv_sec = 60;
267
268         // we never ask the OS to sleep shorter than .2 seconds
269         if ( tv.tv_sec < 1 )
270             tv.tv_usec = 200000;
271
272         if ( select( maxfd+1, &r, &w, 0, &tv ) < 0 ) {
273             // r and w are undefined. we clear them, and dispatch()
274             // won't jump to conclusions
275             FD_ZERO( &r );
276             FD_ZERO( &w );
277         }
278         time_t now = time( 0 );
279
280         // Graph our size before processing events
281         if ( !sizeinram )
282             sizeinram = new GraphableNumber( "memory-used" );
283         sizeinram->setValue( Allocator::inUse() + Allocator::allocated() );
284
285         // Any interesting timers?
286
287         if ( !d->timers.isEmpty() ) {
288             uint now = time( 0 );
289             t = d->timers.first();
290             while ( t ) {
291                 Timer * tmp = t;
292                 ++t;
293                 if ( tmp->active() && tmp->timeout() <= now )
294                     tmp->execute();
295             }
296         }
297
298         // Figure out what each connection cares about.
299
300         it = d->connections.first();
301         while ( it ) {
302             c = it;
303             ++it;
304             int fd = c->fd();
305             if ( fd >= 0 ) {
306                 dispatch( c, FD_ISSET( fd, &r ), FD_ISSET( fd, &w ), now );
307                 FD_CLR( fd, &r );
308                 FD_CLR( fd, &w );
309             }
310             else {
311                 removeConnection( c );
312             }
313         }
314
315         // Graph our size after processing all the events too
316
317         sizeinram->setValue( Allocator::inUse() + Allocator::allocated() );
318
319         // Collect garbage if someone asks for it, or if we've passed
320         // the memory usage goal. This has to be at the end of the
321         // scope, since anything referenced by local variables might
322         // be freed here.
323
324         if ( !d->stop ) {
325             if ( !::freeMemorySoon ) {
326                 uint a = Allocator::inUse() + Allocator::allocated();
327                 if ( now < gc ) {
328                     // time went backwards, best to be paranoid
329                     ::freeMemorySoon = true;
330                 }
331                 else {
332                     // if we're below the limit, we don't modify the
333                     // limit. if we're above, but below 2x, we halve
334                     // the period (right-shift by one bit). if we're at
335                     // 2-3x, we right-shift by two. if we're at 3-4x,
336                     // we right-shift by three, etc.
337
338                     // if memory usage is extreme enough we'll collect
339                     // garbage every second.
340                     uint factor = a / d->limit;
341                     uint period = gcDelay >> factor;
342                     if ( (uint)(now - gc) > period )
343                         ::freeMemorySoon = true;
344                 }
345             }
346             if ( ::freeMemorySoon ) {
347                 freeMemory();
348                 gc = time( 0 );
349                 ::freeMemorySoon = false;
350             }
351         }
352     }
353
354     // This is for event loop shutdown. A little brutal. With any
355     // luck, the listeners have been closed long ago and this is just
356     // for those who wouldn't disconnect voluntarily.
357     log( "Shutting down event loop", Log::Debug );
358     List< Connection >::Iterator it( d->connections );
359     while ( it ) {
360         Connection * c = it;
361         ++it;
362         try {
363             Scope x( c->log() );
364             if ( c->state() == Connection::Connected )
365                 c->react( Connection::Shutdown );
366             if ( c->state() == Connection::Connected )
367                 c->write();
368             if ( c->writeBuffer()->size() > 0 )
369                 c->log( "Still have " +
370                         EString::humanNumber( c->writeBuffer()->size() ) +
371                         " bytes to write", Log::Debug );
372         } catch ( Exception e ) {
373             // we don't really care at this point, do we?
374         }
375     }
376
377     log( "Event loop stopped", Log::Debug );
378 }
379
380
381 /*! Calls Allocator::free() and does any necessary pre- and
382     postprocessing.
383 */
384
385 void EventLoop::freeMemory()
386 {
387     List<Garbage> x;
388     List<Connection>::Iterator i( d->connections );
389     while ( i ) {
390         Connection * c = i;
391         if ( !(c->hasProperty( Connection::Listens )) )
392             x.append( c );
393         ++i;
394     }
395     Garbage * biggest = Allocator::free( &x );
396     // x now points to free memory
397     i = d->connections.first();
398     Connection * victim = 0;
399     while ( i ) {
400         Connection * c = i;
401         if ( c == biggest )
402             victim = i;
403         ++i;
404     }
405
406     if ( victim && Allocator::inUse() > d->limit ) {
407         ::log( "Closing connection due to memory overload: " +
408                victim->description() );
409         victim->react( Connection::Shutdown );
410         victim->close();
411     }
412 }
413
414
415 /*! Dispatches events to the connection \a c, based on its current
416     state, the time \a now and the results from select: \a r is true
417     if the FD may be read, and \a w is true if we know that the FD may
418     be written to. If \a now is past that Connection's timeout, we
419     must send a Timeout event.
420 */
421
422 void EventLoop::dispatch( Connection * c, bool r, bool w, uint now )
423 {
424     int dummy1;
425     socklen_t dummy2;
426     dummy2 = sizeof(dummy1);
427     if ( ::getsockopt( c->fd(), SOL_SOCKET, SO_RCVBUF,
428                        &dummy1, &dummy2 ) < 0 ) {
429         c->close();
430         removeConnection( c );
431         return;
432     }
433
434     try {
435         Scope x( c->log() );
436         if ( c->timeout() != 0 && now >= c->timeout() ) {
437             c->setTimeout( 0 );
438             c->react( Connection::Timeout );
439         }
440
441         if ( c->state() == Connection::Connecting ) {
442             bool error = false;
443             bool connected = false;
444
445             if ( ( w && !r ) || c->isPending( Connection::Connect ) ) {
446                 connected = true;
447             }
448             else if ( c->isPending( Connection::Error ) ) {
449                 error = true;
450             }
451             else if ( w && r ) {
452                 // This might indicate a connection error, or a successful
453                 // connection with outstanding data. (Stevens suggests the
454                 // getsockopt to disambiguate the two, cf. UNPv1 15.4.)
455                 int errval;
456                 int errlen = sizeof( int );
457                 ::getsockopt( c->fd(), SOL_SOCKET, SO_ERROR, (void *)&errval,
458                               (socklen_t *)&errlen );
459
460                 if ( errval == 0 )
461                     connected = true;
462                 else
463                     error = true;
464             }
465
466             if ( connected ) {
467                 c->setState( Connection::Connected );
468                 c->react( Connection::Connect );
469             }
470             else if ( error ) {
471                 c->react( Connection::Error );
472                 c->setState( Connection::Closing );
473                 r = false;
474             }
475         }
476
477         if ( r ) {
478             bool gone = false;
479             if ( !c->hasProperty( Connection::Listens ) ) {
480                 int a = 0;
481                 int r = ioctl( c->fd(), FIONREAD, &a );
482                 if ( r >= 0 && a == 0 )
483                     gone = true;
484             }
485
486             c->read();
487             c->react( Connection::Read );
488
489             if ( gone ) {
490                 c->setState( Connection::Closing );
491                 c->react( Connection::Close );
492             }
493         }
494
495         uint s = c->writeBuffer()->size();
496         c->write();
497         // if we're closing anyway, and we can't write any of what we
498         // want to write, then just forget the buffered data and go on
499         // with the close
500         if ( c->state() == Connection::Closing &&
501              s && s == c->writeBuffer()->size() )
502             c->writeBuffer()->remove( s );
503     }
504     catch ( Exception e ) {
505         EString s;
506         switch (e) {
507         case Invariant:
508             s = "Invariant failed";
509             break;
510         case Memory:
511             s = "Out of memory";
512             break;
513         case FD:
514             s = "FD error";
515             break;
516         };
517         s.append( " while processing " + c->description() );
518         d->log->log( s, Log::Error );
519         if ( !c->hasProperty( Connection::Listens ) )
520             c->close();
521     }
522
523     if ( c->state() == Connection::Closing && !c->canWrite() )
524         c->close();
525     if ( !c->valid() )
526         removeConnection( c );
527 }
528
529
530 /*! Instructs this EventLoop to perform an orderly shutdown in \a s
531     seconds, by sending each participating Connection a Shutdown event
532     before closing.
533
534     Listener connections are closed right away, some/all external
535     connections get a Shutdown event at once, everyone get a Shutdown
536     event at final shutdown.
537 */
538
539 void EventLoop::stop( uint s )
540 {
541     if ( !s ) {
542         d->stop = true;
543         return;
544     }
545
546     (void)new LoopData::Stopper( s );
547     List<Connection>::Iterator i( d->connections );
548     while ( i ) {
549         Connection * c = i;
550         ++i;
551         try {
552             Scope x( c->log() );
553             if ( c->hasProperty( Connection::Listens ) ) {
554                 c->react( Connection::Shutdown );
555                 c->close();
556             }
557             else if ( s <= 10 && !c->hasProperty( Connection::Internal ) ) {
558                 c->react( Connection::Shutdown );
559             }
560         } catch ( Exception e ) {
561             removeConnection( c );
562         }
563     }
564 }
565
566
567 /*! Closes all Connections except \a c1 and \a c2. This helps TlsProxy
568     do its work.
569 */
570
571 void EventLoop::closeAllExcept( Connection * c1, Connection * c2 )
572 {
573     List< Connection >::Iterator it( d->connections );
574     while ( it ) {
575         Connection * c = it;
576         ++it;
577         if ( c != c1 && c != c2 )
578             c->close();
579     }
580 }
581
582
583 /*! Closes all Connection except Listeners. When we fork, this allows
584     us to keep the connections on one side of the fence.
585 */
586
587 void EventLoop::closeAllExceptListeners()
588 {
589     List< Connection >::Iterator it( d->connections );
590     while ( it ) {
591         Connection * c = it;
592         ++it;
593         if ( c->type() != Connection::Listener )
594             c->close();
595     }
596 }
597
598
599 /*! Flushes the write buffer of all connections. */
600
601 void EventLoop::flushAll()
602 {
603     List< Connection >::Iterator it( d->connections );
604     while ( it ) {
605         Connection * c = it;
606         ++it;
607         c->write();
608     }
609 }
610
611
612 /*! Returns true if this EventLoop is still attending to startup chores,
613     and not yet processing Listener requests.
614 */
615
616 bool EventLoop::inStartup() const
617 {
618     return d->startup;
619 }
620
621
622 /*! Sets the startup state of this EventLoop to \a p. If \a p is true,
623     then Listeners will not be processed until this function is called
624     again with \a p set to false.
625 */
626
627 void EventLoop::setStartup( bool p )
628 {
629     d->startup = p;
630 }
631
632
633 /*! Returns true if this EventLoop is shutting down (ie. stop() has
634     been called), and false if it's starting up or operating normally.
635 */
636
637 bool EventLoop::inShutdown() const
638 {
639     return d->stop;
640 }
641
642
643 /*! Returns a pointer to the global event loop, or 0 if setup() has not
644     yet been called.
645 */
646
647 EventLoop * EventLoop::global()
648 {
649     return ::loop;
650 }
651
652
653 /*! This static function is just a convenient shorthand for calling
654     stop() on the global() EventLoop.
655 */
656
657 void EventLoop::shutdown()
658 {
659     ::loop->stop();
660 }
661
662
663 /*! Records that \a t exists, so that the event loop will process \a
664     t.
665 */
666
667 void EventLoop::addTimer( Timer * t )
668 {
669     d->timers.append( t );
670 }
671
672
673 /*! Forgets that \a t exists. The event loop will henceforth never
674     call \a t.
675 */
676
677 void EventLoop::removeTimer( Timer * t )
678 {
679     List<Timer>::Iterator i( d->timers.find( t ) );
680     if ( i )
681         d->timers.take( i );
682 }
683
684 static GraphableNumber * imapgraph = 0;
685 static GraphableNumber * pop3graph = 0;
686 static GraphableNumber * smtpgraph = 0;
687 static GraphableNumber * othergraph = 0;
688 static GraphableNumber * internalgraph = 0;
689 static GraphableNumber * httpgraph = 0;
690 static GraphableNumber * dbgraph = 0;
691
692
693
694 /*! Scans the event loop and stores the current number of different
695     connections using GraphableNumber.
696 */
697
698 void EventLoop::setConnectionCounts()
699 {
700     uint imap = 0;
701     uint pop3 = 0;
702     uint smtp = 0;
703     uint other = 0;
704     uint internal = 0;
705     uint http = 0;
706     uint db = 0;
707     bool listeners = false;
708     List<Connection>::Iterator c( d->connections );
709     while ( c ) {
710         switch( c->type() ) {
711         case Connection::Client:
712         case Connection::LogServer:
713         case Connection::GraphDumper:
714         case Connection::LogClient:
715         case Connection::TlsProxy:
716         case Connection::TlsClient:
717         case Connection::RecorderClient:
718         case Connection::RecorderServer:
719         case Connection::Pipe:
720             internal++;
721             break;
722         case Connection::DatabaseClient:
723             db++;
724             break;
725         case Connection::ImapServer:
726             imap++;
727             break;
728         case Connection::SmtpServer:
729             smtp++;
730             break;
731         case Connection::SmtpClient:
732         case Connection::ManageSieveServer:
733         case Connection::EGDServer:
734         case Connection::LdapRelay:
735             other++;
736             break;
737         case Connection::Pop3Server:
738             pop3++;
739             break;
740         case Connection::HttpServer:
741             http++;
742             break;
743         case Connection::Listener:
744             listeners = true;
745             // we don't count these, we only count connections
746             break;
747         }
748         ++c;
749     }
750     if ( !listeners )
751         return;
752     if ( !imapgraph ) {
753         imapgraph = new GraphableNumber( "imap-connections" );
754         pop3graph = new GraphableNumber( "pop3-connections" );
755         smtpgraph = new GraphableNumber( "smtp-connections" );
756         othergraph = new GraphableNumber( "other-connections" );
757         internalgraph = new GraphableNumber( "internal-connections" );
758         httpgraph = new GraphableNumber( "http-connections" );
759         dbgraph = new GraphableNumber( "db-connections" );
760     }
761     imapgraph->setValue( imap );
762     pop3graph->setValue( pop3 );
763     smtpgraph->setValue( smtp );
764     othergraph->setValue( other );
765     internalgraph->setValue( internal );
766     httpgraph->setValue( http );
767     dbgraph->setValue( db );
768 }
769
770
771 /*! Stops all the SSL-enabled Listeners. */
772
773 void EventLoop::shutdownSSL()
774 {
775     log( "Shutting down SSL-enabled Listeners", Log::Error );
776     List< Connection >::Iterator it( d->connections );
777     while ( it ) {
778         Connection * c = it;
779         ++it;
780         if ( c->hasProperty( Connection::Listens ) &&
781              c->hasProperty( Connection::StartsSSL ) )
782             c->close();
783     }
784 }
785
786
787 /*! Requests the event loop to collect garbage and clear any caches at
788     the earliest opportunity. Used for debugging.
789 */
790
791 void EventLoop::freeMemorySoon()
792 {
793     ::freeMemorySoon = true;
794 }
795
796
797 /*! Instructs this event loop to collect garbage when memory usage
798     passes \a limit bytes. The default is 0, which means to collect
799     garbage even if very little is being used.
800 */
801
802 void EventLoop::setMemoryUsage( uint limit )
803 {
804     d->limit = limit;
805 }
806
807
808 /*! Returns whatever setMemoryUsage() has recorded, or 0 meaning to
809     collect garbage often.
810 */
811
812 uint EventLoop::memoryUsage() const
813 {
814     return d->limit;
815 }