pupnp (libupnp) snapshot from SourceForge: git clone git://pupnp.git.sourceforge...
[igd2-for-linux:pandonghui1211s-igd2-for-linux.git] / pupnp_branch-1.6.x / threadutil / src / ThreadPool.c
1 ///////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000-2003 Intel Corporation 
4 // All rights reserved. 
5 //
6 // Redistribution and use in source and binary forms, with or without 
7 // modification, are permitted provided that the following conditions are met: 
8 //
9 // * Redistributions of source code must retain the above copyright notice, 
10 // this list of conditions and the following disclaimer. 
11 // * Redistributions in binary form must reproduce the above copyright notice, 
12 // this list of conditions and the following disclaimer in the documentation 
13 // and/or other materials provided with the distribution. 
14 // * Neither name of Intel Corporation nor the names of its contributors 
15 // may be used to endorse or promote products derived from this software 
16 // without specific prior written permission.
17 // 
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR 
22 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
23 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
24 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
25 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 
26 // OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
28 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 ///////////////////////////////////////////////////////////////////////////
31
32 #include "ThreadPool.h"
33 #include "FreeList.h"
34 #include <assert.h>
35 #include <stdlib.h>
36
37 #include <stdio.h>
38
39 /****************************************************************************
40  * Function: DiffMillis
41  *
42  *  Description:
43  *      Returns the difference in milliseconds between two
44  *      timeval structures.
45  *      Internal Only.
46  *  Parameters:
47  *      struct timeval *time1,
48  *      struct timeval *time2,
49  *  Returns:
50  *       the difference in milliseconds, time1-time2.
51  *****************************************************************************/
52 static unsigned long DiffMillis( struct timeval *time1, struct timeval *time2 )
53 {
54         double temp = 0;
55
56         assert( time1 != NULL );
57         assert( time2 != NULL );
58
59         temp = time1->tv_sec - time2->tv_sec;
60         /* convert to milliseconds */
61         temp *= 1000;
62
63         /* convert microseconds to milliseconds and add to temp */
64         /* implicit flooring of unsigned long data type */
65         temp += (time1->tv_usec - time2->tv_usec) / 1000;
66
67         return temp;
68 }
69
70 #ifdef STATS
71 /****************************************************************************
72  * Function: StatsInit
73  *
74  *  Description:
75  *      Initializes the statistics structure.
76  *      Internal Only.
77  *  Parameters:
78  *      ThreadPoolStats *stats must be valid non null stats structure
79  *****************************************************************************/
80 static void StatsInit( ThreadPoolStats *stats )
81 {
82         assert( stats != NULL );
83
84         stats->totalIdleTime = 0;
85         stats->totalJobsHQ = 0;
86         stats->totalJobsLQ = 0;
87         stats->totalJobsMQ = 0;
88         stats->totalTimeHQ = 0;
89         stats->totalTimeMQ = 0;
90         stats->totalTimeLQ = 0;
91         stats->totalWorkTime = 0;
92         stats->totalIdleTime = 0;
93         stats->avgWaitHQ = 0;
94         stats->avgWaitMQ = 0;
95         stats->avgWaitLQ = 0;
96         stats->workerThreads = 0;
97         stats->idleThreads = 0;
98         stats->persistentThreads = 0;
99         stats->maxThreads = 0; stats->totalThreads = 0;
100 }
101
102 static void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime )
103 {
104         tp->stats.totalJobsLQ++;
105         tp->stats.totalTimeLQ += diffTime;
106 }
107
108 static void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime )
109 {
110         tp->stats.totalJobsMQ++;
111         tp->stats.totalTimeMQ += diffTime;
112 }
113
114 static void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime )
115 {
116         tp->stats.totalJobsHQ++;
117         tp->stats.totalTimeHQ += diffTime;
118 }
119
120 /****************************************************************************
121  * Function: CalcWaitTime
122  *
123  *  Description:
124  *      Calculates the time the job has been waiting at the specified
125  *      priority. Adds to the totalTime and totalJobs kept in the
126  *      thread pool statistics structure.
127  *      Internal Only.
128  *
129  *  Parameters:
130  *      ThreadPool *tp
131  *      ThreadPriority p
132  *      ThreadPoolJob *job
133  *****************************************************************************/
134 static void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job )
135 {
136         struct timeval now;
137         unsigned long diff;
138
139         assert( tp != NULL );
140         assert( job != NULL );
141
142         gettimeofday( &now, NULL );
143         diff = DiffMillis( &now, &job->requestTime );
144         switch ( p ) {
145         case LOW_PRIORITY:
146                 StatsAccountLQ( tp, diff );
147                 break;
148         case MED_PRIORITY:
149                 StatsAccountMQ( tp, diff );
150                 break;
151         case HIGH_PRIORITY:
152                 StatsAccountHQ( tp, diff );
153                 break;
154         default:
155                 assert( 0 );
156         }
157 }
158
159 static time_t StatsTime( time_t *t )
160 {
161         struct timeval tv;
162
163         gettimeofday( &tv, NULL );
164         if (t) {
165                 *t = tv.tv_sec;
166         }
167
168         return tv.tv_sec;
169 }
170 #else /* STATS */
171 static UPNP_INLINE void StatsInit( ThreadPoolStats *stats ) {}
172 static UPNP_INLINE void StatsAccountLQ( ThreadPool *tp, unsigned long diffTime ) {}
173 static UPNP_INLINE void StatsAccountMQ( ThreadPool *tp, unsigned long diffTime ) {}
174 static UPNP_INLINE void StatsAccountHQ( ThreadPool *tp, unsigned long diffTime ) {}
175 static UPNP_INLINE void CalcWaitTime( ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job ) {}
176 static UPNP_INLINE time_t StatsTime( time_t *t ) { return 0; }
177 #endif /* STATS */
178
179 /****************************************************************************
180  * Function: CmpThreadPoolJob
181  *
182  *  Description:
183  *      Compares thread pool jobs.
184  *  Parameters:
185  *      void * - job A
186  *      void * - job B
187  *****************************************************************************/
188 static int CmpThreadPoolJob( void *jobA, void *jobB )
189 {
190         ThreadPoolJob *a = ( ThreadPoolJob *) jobA;
191         ThreadPoolJob *b = ( ThreadPoolJob *) jobB;
192
193         assert( jobA != NULL );
194         assert( jobB != NULL );
195
196         return ( a->jobId == b->jobId );
197 }
198
199 /****************************************************************************
200  * Function: FreeThreadPoolJob
201  *
202  *  Description:
203  *      Deallocates a dynamically allocated ThreadPoolJob.
204  *  Parameters:
205  *      ThreadPoolJob *tpj - must be allocated with CreateThreadPoolJob
206  *****************************************************************************/
207 static void FreeThreadPoolJob( ThreadPool *tp, ThreadPoolJob *tpj )
208 {
209         assert( tp != NULL );
210
211         FreeListFree( &tp->jobFreeList, tpj );
212 }
213
214 /****************************************************************************
215  * Function: SetPolicyType
216  *
217  *  Description:
218  *      Sets the scheduling policy of the current process.
219  *      Internal only.
220  *  Parameters:
221  *      PolocyType in
222  *  Returns:
223  *      0 on success, nonzero on failure
224  *      Returns result of GetLastError() on failure.
225  *
226  *****************************************************************************/
227 static int SetPolicyType( PolicyType in )
228 {
229 #ifdef __CYGWIN__
230         /* TODO not currently working... */
231         return 0;
232 #elif defined(__OSX__) || defined(__APPLE__)
233         setpriority( PRIO_PROCESS, 0, 0 );
234         return 0;
235 #elif defined(WIN32)
236         return sched_setscheduler( 0, in );
237 #elif defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
238         struct sched_param current;
239
240         sched_getparam( 0, &current );
241         current.sched_priority = DEFAULT_SCHED_PARAM;
242         return sched_setscheduler( 0, in, &current );
243 #else
244         return 0;
245 #endif
246 }
247
248 /****************************************************************************
249  * Function: SetPriority
250  *
251  *  Description:
252  *      Sets the priority of the currently running thread.
253  *      Internal only.
254  *  Parameters:
255  *      ThreadPriority priority
256  *  Returns:
257  *      0 on success, nonzero on failure
258  *      EINVAL invalid priority
259  *      Returns result of GerLastError on failure.
260  *
261  *****************************************************************************/
262 static int SetPriority( ThreadPriority priority )
263 {
264 #if defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
265         int currentPolicy;
266         int minPriority = 0;
267         int maxPriority = 0;
268         int actPriority = 0;
269         int midPriority = 0;
270         struct sched_param newPriority;
271
272         pthread_getschedparam( ithread_self(), &currentPolicy, &newPriority );
273         minPriority = sched_get_priority_min( currentPolicy );
274         maxPriority = sched_get_priority_max( currentPolicy );
275         midPriority = ( maxPriority - minPriority ) / 2;
276         switch ( priority ) {
277         case LOW_PRIORITY:
278                 actPriority = minPriority;
279                 break;
280         case MED_PRIORITY:
281                 actPriority = midPriority;
282                 break;
283         case HIGH_PRIORITY:
284                 actPriority = maxPriority;
285                 break;
286         default:
287                 return EINVAL;
288         };
289
290         newPriority.sched_priority = actPriority;
291
292         return pthread_setschedparam(ithread_self(), currentPolicy, &newPriority );
293 #else
294         return 0;
295 #endif
296 }
297
298 /****************************************************************************
299  * Function: BumpPriority
300  *
301  *  Description:
302  *      Determines whether any jobs
303  *      need to be bumped to a higher priority Q and bumps them.
304  *
305  *      tp->mutex must be locked.
306  *      Internal Only.
307  *  Parameters:
308  *      ThreadPool *tp
309  *****************************************************************************/
310 static void BumpPriority( ThreadPool *tp )
311 {
312     int done = 0;
313     struct timeval now;
314     unsigned long diffTime = 0;
315     ThreadPoolJob *tempJob = NULL;
316
317     assert( tp != NULL );
318
319     gettimeofday(&now, NULL);   
320
321     while( !done ) {
322         if( tp->medJobQ.size ) {
323             tempJob = ( ThreadPoolJob *) tp->medJobQ.head.next->item;
324             diffTime = DiffMillis( &now, &tempJob->requestTime );
325             if( diffTime >= ( tp->attr.starvationTime ) ) {
326                 // If job has waited longer than the starvation time
327                 // bump priority (add to higher priority Q)
328                 StatsAccountMQ( tp, diffTime );
329                 ListDelNode( &tp->medJobQ, tp->medJobQ.head.next, 0 );
330                 ListAddTail( &tp->highJobQ, tempJob );
331                 continue;
332             }
333         }
334         if( tp->lowJobQ.size ) {
335             tempJob = ( ThreadPoolJob *) tp->lowJobQ.head.next->item;
336             diffTime = DiffMillis( &now, &tempJob->requestTime );
337             if( diffTime >= ( tp->attr.maxIdleTime ) ) {
338                 // If job has waited longer than the starvation time
339                 // bump priority (add to higher priority Q)
340                 StatsAccountLQ( tp, diffTime );
341                 ListDelNode( &tp->lowJobQ, tp->lowJobQ.head.next, 0 );
342                 ListAddTail( &tp->medJobQ, tempJob );
343                 continue;
344             }
345         }
346         done = 1;
347     }
348 }
349
350 /****************************************************************************
351  * Function: SetRelTimeout
352  *
353  *  Description:
354  *      Sets the fields of the
355  *      passed in timespec to be relMillis milliseconds in the future.
356  *      Internal Only.
357  *  Parameters:
358  *      struct timespec *time
359  *      int relMillis - milliseconds in the future
360  *****************************************************************************/
361 static void SetRelTimeout( struct timespec *time, int relMillis )
362 {
363         struct timeval now;
364         int sec = relMillis / 1000;
365         int milliSeconds = relMillis % 1000;
366
367         assert( time != NULL );
368
369         gettimeofday( &now, NULL );
370
371         time->tv_sec = now.tv_sec + sec;
372         time->tv_nsec = ( (now.tv_usec/1000) + milliSeconds ) * 1000000;
373 }
374
375 /****************************************************************************
376  * Function: SetSeed
377  *
378  *  Description:
379  *      Sets seed for random number generator.
380  *      Each thread sets the seed random number generator.
381  *      Internal Only.
382  *  Parameters:
383  *      
384  *****************************************************************************/
385 static void SetSeed()
386 {
387         struct timeval t;
388   
389         gettimeofday(&t, NULL);
390 #if defined(WIN32)
391         srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id().p );
392 #elif defined(__FreeBSD__) || defined(__OSX__) || defined(__APPLE__)
393         srand( ( unsigned int )t.tv_usec + (unsigned int)ithread_get_current_thread_id() );
394 #elif defined(__linux__) || defined(__sun) || defined(__CYGWIN__) || defined(__GLIBC__)
395         srand( ( unsigned int )t.tv_usec + ithread_get_current_thread_id() );
396 #else
397         {
398                 volatile union { volatile pthread_t tid; volatile unsigned i; } idu;
399
400                 idu.tid = ithread_get_current_thread_id();
401                 srand( ( unsigned int )t.millitm + idu.i );
402         }
403 #endif
404 }
405
406 /****************************************************************************
407  * Function: WorkerThread
408  *
409  *  Description:
410  *      Implements a thread pool worker.
411  *      Worker waits for a job to become available.
412  *      Worker picks up persistent jobs first, high priority, med priority,
413  *             then low priority.
414  *      If worker remains idle for more than specified max, the worker
415  *      is released.
416  *      Internal Only.
417  *  Parameters:
418  *      void * arg -> is cast to ThreadPool *
419  *****************************************************************************/
420 static void *WorkerThread( void *arg )
421 {
422         time_t start = 0;
423
424         ThreadPoolJob *job = NULL;
425         ListNode *head = NULL;
426
427         struct timespec timeout;
428         int retCode = 0;
429         int persistent = -1;
430         ThreadPool *tp = ( ThreadPool *) arg;
431         // allow static linking
432 #ifdef WIN32
433 #ifdef PTW32_STATIC_LIB
434         pthread_win32_thread_attach_np();
435 #endif
436 #endif
437         assert( tp != NULL );
438
439         // Increment total thread count
440         ithread_mutex_lock( &tp->mutex );
441         tp->totalThreads++;
442         ithread_cond_broadcast( &tp->start_and_shutdown );
443         ithread_mutex_unlock( &tp->mutex );
444
445         SetSeed();
446         StatsTime( &start );
447         while( 1 ) {
448                 ithread_mutex_lock( &tp->mutex );
449                 if( job ) {
450                         FreeThreadPoolJob( tp, job );
451                         job = NULL;
452                 }
453                 retCode = 0;
454
455                 tp->stats.idleThreads++;
456                 tp->stats.totalWorkTime += ( StatsTime( NULL ) - start ); // work time
457                 StatsTime( &start ); // idle time
458
459                 if( persistent == 1 ) {
460                         // Persistent thread
461                         // becomes a regular thread
462                         tp->persistentThreads--;
463                 }
464
465                 if( persistent == 0 ) {
466                         tp->stats.workerThreads--;
467                 }
468
469                 // Check for a job or shutdown
470                 while( tp->lowJobQ.size  == 0 &&
471                        tp->medJobQ.size  == 0 &&
472                        tp->highJobQ.size == 0 &&
473                        !tp->persistentJob     &&
474                        !tp->shutdown ) {
475                         // If wait timed out
476                         // and we currently have more than the
477                         // min threads, or if we have more than the max threads
478                         // (only possible if the attributes have been reset)
479                         // let this thread die.
480                         if( ( retCode == ETIMEDOUT &&
481                               tp->totalThreads > tp->attr.minThreads ) ||
482                             ( tp->attr.maxThreads != -1 &&
483                               tp->totalThreads > tp->attr.maxThreads ) ) {
484                                 tp->stats.idleThreads--;
485                                 tp->totalThreads--;
486                                 ithread_cond_broadcast( &tp->start_and_shutdown );
487                                 ithread_mutex_unlock( &tp->mutex );
488 #ifdef WIN32
489 #ifdef PTW32_STATIC_LIB
490                                 // allow static linking
491                                 pthread_win32_thread_detach_np ();
492 #endif
493 #endif
494                                 return NULL;
495                         }
496                         SetRelTimeout( &timeout, tp->attr.maxIdleTime );
497
498                         // wait for a job up to the specified max time
499                         retCode = ithread_cond_timedwait(
500                                 &tp->condition, &tp->mutex, &timeout );
501                 }
502
503                 tp->stats.idleThreads--;
504                 tp->stats.totalIdleTime += ( StatsTime( NULL ) - start ); // idle time
505                 StatsTime( &start ); // work time
506
507                 // bump priority of starved jobs
508                 BumpPriority( tp );
509
510                 // if shutdown then stop
511                 if( tp->shutdown ) {
512                         tp->totalThreads--;
513                         ithread_cond_broadcast( &tp->start_and_shutdown );
514                         ithread_mutex_unlock( &tp->mutex );
515 #ifdef WIN32
516 #ifdef PTW32_STATIC_LIB
517                         // allow static linking
518                         pthread_win32_thread_detach_np ();
519 #endif
520 #endif
521                         return NULL;
522                 } else {
523                         // Pick up persistent job if available
524                         if( tp->persistentJob ) {
525                                 job = tp->persistentJob;
526                                 tp->persistentJob = NULL;
527                                 tp->persistentThreads++;
528                                 persistent = 1;
529                                 ithread_cond_broadcast( &tp->start_and_shutdown );
530                         } else {
531                                 tp->stats.workerThreads++;
532                                 persistent = 0;
533                                 // Pick the highest priority job
534                                 if( tp->highJobQ.size > 0 ) {
535                                         head = ListHead( &tp->highJobQ );
536                                         job = ( ThreadPoolJob *) head->item;
537                                         CalcWaitTime( tp, HIGH_PRIORITY, job );
538                                         ListDelNode( &tp->highJobQ, head, 0 );
539                                 } else if( tp->medJobQ.size > 0 ) {
540                                         head = ListHead( &tp->medJobQ );
541                                         job = ( ThreadPoolJob *) head->item;
542                                         CalcWaitTime( tp, MED_PRIORITY, job );
543                                         ListDelNode( &tp->medJobQ, head, 0 );
544                                 } else if( tp->lowJobQ.size > 0 ) {
545                                         head = ListHead( &tp->lowJobQ );
546                                         job = ( ThreadPoolJob *) head->item;
547                                         CalcWaitTime( tp, LOW_PRIORITY, job );
548                                         ListDelNode( &tp->lowJobQ, head, 0 );
549                                 } else {
550                                         // Should never get here
551                                         assert( 0 );
552                                         tp->stats.workerThreads--;
553                                         tp->totalThreads--;
554                                         ithread_cond_broadcast( &tp->start_and_shutdown );
555                                         ithread_mutex_unlock( &tp->mutex );
556
557                                         return NULL;
558                                 }
559                         }
560                 }
561
562                 ithread_mutex_unlock( &tp->mutex );
563
564                 if( SetPriority( job->priority ) != 0 ) {
565                         // In the future can log
566                         // info
567                 } else {
568                         // In the future can log
569                         // info
570                 }
571
572                 // run the job
573                 job->func( job->arg );
574
575                 // return to Normal
576                 SetPriority( DEFAULT_PRIORITY );
577         }
578 }
579
580 /****************************************************************************
581  * Function: CreateThreadPoolJob
582  *
583  *  Description:
584  *      Creates a Thread Pool Job. (Dynamically allocated)
585  *      Internal to thread pool.
586  *  Parameters:
587  *      ThreadPoolJob *job - job is copied
588  *      id - id of job
589  *
590  *  Returns:
591  *      ThreadPoolJob *on success, NULL on failure.
592  *****************************************************************************/
593 static ThreadPoolJob *CreateThreadPoolJob( ThreadPoolJob *job, int id, ThreadPool *tp )
594 {
595         ThreadPoolJob *newJob = NULL;
596
597         assert( job != NULL );
598         assert( tp != NULL );
599
600         newJob = (ThreadPoolJob *)FreeListAlloc( &tp->jobFreeList );
601         if( newJob ) {
602                 *newJob = *job;
603                 newJob->jobId = id;
604                 gettimeofday( &newJob->requestTime, NULL );
605         }
606
607         return newJob;
608 }
609
610 /****************************************************************************
611  * Function: CreateWorker
612  *
613  *  Description:
614  *      Creates a worker thread, if the thread pool
615  *      does not already have max threads.
616  *      Internal to thread pool.
617  *  Parameters:
618  *      ThreadPool *tp
619  *
620  *  Returns:
621  *      0 on success, <0 on failure
622  *      EMAXTHREADS if already max threads reached
623  *      EAGAIN if system can not create thread
624  *
625  *****************************************************************************/
626 static int CreateWorker( ThreadPool *tp )
627 {
628         ithread_t temp;
629         int rc = 0;
630         int currentThreads = tp->totalThreads + 1;
631
632         assert( tp != NULL );
633
634         if ( tp->attr.maxThreads != INFINITE_THREADS &&
635              currentThreads > tp->attr.maxThreads ) {
636                 return EMAXTHREADS;
637         }
638
639         rc = ithread_create( &temp, NULL, WorkerThread, tp );
640         if( rc == 0 ) {
641                 rc = ithread_detach( temp );
642                 while( tp->totalThreads < currentThreads ) {
643                         ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
644                 }
645         }
646
647         if( tp->stats.maxThreads < tp->totalThreads ) {
648                 tp->stats.maxThreads = tp->totalThreads;
649         }
650
651         return rc;
652 }
653
654 /****************************************************************************
655  * Function: AddWorker
656  *
657  *  Description:
658  *      Determines whether or not a thread should be added
659  *      based on the jobsPerThread ratio.
660  *      Adds a thread if appropriate.
661  *      Internal to Thread Pool.
662  *  Parameters:
663  *      ThreadPool* tp
664  *
665  *****************************************************************************/
666 static void AddWorker( ThreadPool *tp )
667 {
668         int jobs = 0;
669         int threads = 0;
670
671         assert( tp != NULL );
672
673         jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
674         threads = tp->totalThreads - tp->persistentThreads;
675         while( threads == 0 || (jobs / threads) >= tp->attr.jobsPerThread ) {
676                 if( CreateWorker( tp ) != 0 ) {
677                         return;
678                 }
679                 threads++;
680         }
681 }
682
683 /****************************************************************************
684  * Function: ThreadPoolInit
685  *
686  *  Description:
687  *      Initializes and starts ThreadPool. Must be called first.
688  *      And only once for ThreadPool.
689  *  Parameters:
690  *      tp  - must be valid, non null, pointer to ThreadPool.
691  *      minWorkerThreads - minimum number of worker threads
692  *                         thread pool will never have less than this
693  *                         number of threads.
694  *      maxWorkerThreads - maximum number of worker threads
695  *                         thread pool will never have more than this
696  *                         number of threads.
697  *      maxIdleTime      - maximum time that a worker thread will spend
698  *                         idle. If a worker is idle longer than this
699  *                         time and there are more than the min
700  *                         number of workers running, than the
701  *                         worker thread exits.
702  *      jobsPerThread    - ratio of jobs to thread to try and maintain
703  *                         if a job is scheduled and the number of jobs per
704  *                         thread is greater than this number,and
705  *                         if less than the maximum number of
706  *                         workers are running then a new thread is
707  *                         started to help out with efficiency.
708  *      schedPolicy      - scheduling policy to try and set (OS dependent)
709  *  Returns:
710  *      0 on success, nonzero on failure.
711  *      EAGAIN if not enough system resources to create minimum threads.
712  *      INVALID_POLICY if schedPolicy can't be set
713  *      EMAXTHREADS if minimum threads is greater than maximum threads
714  *****************************************************************************/
715 int ThreadPoolInit( ThreadPool *tp, ThreadPoolAttr *attr )
716 {
717         int retCode = 0;
718         int i = 0;
719
720         assert( tp != NULL );
721         if( tp == NULL ) {
722                 return EINVAL;
723         }
724 #ifdef WIN32
725 #ifdef PTW32_STATIC_LIB
726         pthread_win32_process_attach_np();
727 #endif
728 #endif
729
730         retCode += ithread_mutex_init( &tp->mutex, NULL );
731         assert( retCode == 0 );
732
733         retCode += ithread_mutex_lock( &tp->mutex );
734         assert( retCode == 0 );
735
736         retCode += ithread_cond_init( &tp->condition, NULL );
737         assert( retCode == 0 );
738
739         retCode += ithread_cond_init( &tp->start_and_shutdown, NULL );
740         assert( retCode == 0 );
741
742         if( retCode != 0 ) {
743                 return EAGAIN;
744         }
745
746         if( attr ) {
747                 tp->attr = ( *attr );
748         } else {
749                 TPAttrInit( &tp->attr );
750         }
751
752         if( SetPolicyType( tp->attr.schedPolicy ) != 0 ) {
753                 ithread_mutex_unlock( &tp->mutex );
754                 ithread_mutex_destroy( &tp->mutex );
755                 ithread_cond_destroy( &tp->condition );
756                 ithread_cond_destroy( &tp->start_and_shutdown );
757                 return INVALID_POLICY;
758         }
759
760         retCode += FreeListInit(
761                 &tp->jobFreeList, sizeof( ThreadPoolJob ), JOBFREELISTSIZE );
762         assert( retCode == 0 );
763
764         StatsInit( &tp->stats );
765
766         retCode += ListInit( &tp->highJobQ, CmpThreadPoolJob, NULL );
767         assert( retCode == 0 );
768
769         retCode += ListInit( &tp->medJobQ, CmpThreadPoolJob, NULL );
770         assert( retCode == 0 );
771
772         retCode += ListInit( &tp->lowJobQ, CmpThreadPoolJob, NULL );
773         assert( retCode == 0 );
774
775         if( retCode != 0 ) {
776                 retCode = EAGAIN;
777         } else {
778                 tp->persistentJob = NULL;
779                 tp->lastJobId = 0;
780                 tp->shutdown = 0;
781                 tp->totalThreads = 0;
782                 tp->persistentThreads = 0;
783                 for( i = 0; i < tp->attr.minThreads; ++i ) {
784                         if( ( retCode = CreateWorker( tp ) ) != 0 ) {
785                                 break;
786                         }
787                 }
788         }
789
790         ithread_mutex_unlock( &tp->mutex );
791
792         if( retCode != 0 ) {
793                 // clean up if the min threads could not be created
794                 ThreadPoolShutdown( tp );
795         }
796
797         return retCode;
798 }
799
800 /****************************************************************************
801  * Function: ThreadPoolAddPersistent
802  *
803  *  Description:
804  *      Adds a long term job to the thread pool.
805  *      Job will be run as soon as possible.
806  *      Call will block until job is scheduled.
807  *  Parameters:
808  *      tp - valid thread pool pointer
809  *      job-> valid ThreadPoolJob pointer with following fields
810  *          func - ThreadFunction to run
811  *          arg - argument to function.
812  *          priority - priority of job.
813  *          free_function - function to use when freeing argument
814  *  Returns:
815  *      0 on success, nonzero on failure
816  *      EOUTOFMEM not enough memory to add job.
817  *      EMAXTHREADS not enough threads to add persistent job.
818  *****************************************************************************/
819 int ThreadPoolAddPersistent( ThreadPool *tp, ThreadPoolJob *job, int *jobId )
820 {
821         int tempId = -1;
822         ThreadPoolJob *temp = NULL;
823
824         assert( tp != NULL );
825         assert( job != NULL );
826         if( ( tp == NULL ) || ( job == NULL ) ) {
827                 return EINVAL;
828         }
829
830         if( jobId == NULL ) {
831                 jobId = &tempId;
832         }
833
834         *jobId = INVALID_JOB_ID;
835
836         ithread_mutex_lock( &tp->mutex );
837
838         assert( job->priority == LOW_PRIORITY ||
839                 job->priority == MED_PRIORITY ||
840                 job->priority == HIGH_PRIORITY );
841
842         // Create A worker if less than max threads running
843         if( tp->totalThreads < tp->attr.maxThreads ) {
844                 CreateWorker( tp );
845         } else {
846                 // if there is more than one worker thread
847                 // available then schedule job, otherwise fail
848                 if( tp->totalThreads - tp->persistentThreads - 1 == 0 ) {
849                         ithread_mutex_unlock( &tp->mutex );
850                         return EMAXTHREADS;
851                 }
852         }
853
854         temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
855         if( temp == NULL ) {
856                 ithread_mutex_unlock( &tp->mutex );
857                 return EOUTOFMEM;
858         }
859
860         tp->persistentJob = temp;
861
862         // Notify a waiting thread
863         ithread_cond_signal( &tp->condition );
864
865         // wait until long job has been picked up
866         while( tp->persistentJob != NULL ) {
867                 ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
868         }
869
870         *jobId = tp->lastJobId++;
871         ithread_mutex_unlock( &tp->mutex );
872
873         return 0;
874 }
875
876 /****************************************************************************
877  * Function: ThreadPoolAdd
878  *
879  *  Description:
880  *      Adds a job to the thread pool.
881  *      Job will be run as soon as possible.
882  *  Parameters:
883  *      tp - valid thread pool pointer
884  *      func - ThreadFunction to run
885  *      arg - argument to function.
886  *      priority - priority of job.
887  *      jobId - id of job
888  *      duration - whether or not this is a persistent thread
889  *      free_function - function to use when freeing argument
890  *  Returns:
891  *      0 on success, nonzero on failure
892  *      EOUTOFMEM if not enough memory to add job.
893  *****************************************************************************/
894 int ThreadPoolAdd( ThreadPool *tp, ThreadPoolJob *job, int *jobId )
895 {
896         int rc = EOUTOFMEM;
897
898         int tempId = -1;
899         int totalJobs;
900
901         ThreadPoolJob *temp = NULL;
902
903         assert( tp != NULL );
904         assert( job != NULL );
905         if( ( tp == NULL ) || ( job == NULL ) ) {
906                 return EINVAL;
907         }
908
909         ithread_mutex_lock( &tp->mutex );
910
911         assert( job->priority == LOW_PRIORITY ||
912         job->priority == MED_PRIORITY ||
913         job->priority == HIGH_PRIORITY );
914
915         totalJobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
916         if (totalJobs >= tp->attr.maxJobsTotal) {
917                 fprintf(stderr, "total jobs = %d, too many jobs", totalJobs);
918                 ithread_mutex_unlock( &tp->mutex );
919                 return rc;
920         }
921
922         if( jobId == NULL ) {
923                 jobId = &tempId;
924         }
925         *jobId = INVALID_JOB_ID;
926
927         temp = CreateThreadPoolJob( job, tp->lastJobId, tp );
928         if( temp == NULL ) {
929                 ithread_mutex_unlock( &tp->mutex );
930                 return rc;
931         }
932
933         if( job->priority == HIGH_PRIORITY ) {
934                 if( ListAddTail( &tp->highJobQ, temp ) ) {
935                         rc = 0;
936                 }
937         } else if( job->priority == MED_PRIORITY ) {
938                 if( ListAddTail( &tp->medJobQ, temp ) ) {
939                         rc = 0;
940                 }
941         } else {
942                 if( ListAddTail( &tp->lowJobQ, temp ) ) {
943                         rc = 0;
944                 }
945         }
946
947         // AddWorker if appropriate
948         AddWorker( tp );
949
950         // Notify a waiting thread
951         if( rc == 0 ) {
952                 ithread_cond_signal( &tp->condition );
953         } else {
954                 FreeThreadPoolJob( tp, temp );
955         }
956
957         *jobId = tp->lastJobId++;
958
959         ithread_mutex_unlock( &tp->mutex );
960
961         return rc;
962 }
963
964 /****************************************************************************
965  * Function: ThreadPoolRemove
966  *
967  *  Description:
968  *      Removes a job from the thread pool.
969  *      Can only remove jobs which are not
970  *      currently running.
971  *  Parameters:
972  *      tp - valid thread pool pointer
973  *      jobId - id of job
974  *      ThreadPoolJob *out - space for removed job.
975  *                           Can be null if not needed.
976  *
977  *  Returns:
978  *      0 on success. INVALID_JOB_ID on failure.
979  *****************************************************************************/
980 int ThreadPoolRemove( ThreadPool *tp, int jobId, ThreadPoolJob *out )
981 {
982         ThreadPoolJob *temp = NULL;
983         int ret = INVALID_JOB_ID;
984         ListNode *tempNode = NULL;
985         ThreadPoolJob dummy;
986
987         assert( tp != NULL );
988         if( tp == NULL ) {
989                 return EINVAL;
990         }
991
992         if( out == NULL ) {
993                 out = &dummy;
994         }
995
996         dummy.jobId = jobId;
997
998         ithread_mutex_lock( &tp->mutex );
999
1000         tempNode = ListFind( &tp->highJobQ, NULL, &dummy );
1001         if( tempNode ) {
1002                 temp = (ThreadPoolJob *)tempNode->item;
1003                 *out = *temp;
1004                 ListDelNode( &tp->highJobQ, tempNode, 0 );
1005                 FreeThreadPoolJob( tp, temp );
1006                 ithread_mutex_unlock( &tp->mutex );
1007
1008                 return 0;
1009         }
1010
1011         tempNode = ListFind( &tp->medJobQ, NULL, &dummy );
1012         if( tempNode ) {
1013                 temp = (ThreadPoolJob *)tempNode->item;
1014                 *out = *temp;
1015                 ListDelNode( &tp->medJobQ, tempNode, 0 );
1016                 FreeThreadPoolJob( tp, temp );
1017                 ithread_mutex_unlock( &tp->mutex );
1018
1019                 return 0;
1020         }
1021
1022         tempNode = ListFind( &tp->lowJobQ, NULL, &dummy );
1023         if( tempNode ) {
1024                 temp = (ThreadPoolJob *)tempNode->item;
1025                 *out = *temp;
1026                 ListDelNode( &tp->lowJobQ, tempNode, 0 );
1027                 FreeThreadPoolJob( tp, temp );
1028                 ithread_mutex_unlock( &tp->mutex );
1029
1030                 return 0;
1031         }
1032
1033         if( tp->persistentJob && tp->persistentJob->jobId == jobId ) {
1034                 *out = *tp->persistentJob;
1035                 FreeThreadPoolJob( tp, tp->persistentJob );
1036                 tp->persistentJob = NULL;
1037                 ithread_mutex_unlock( &tp->mutex );
1038
1039                 return 0;
1040         }
1041
1042         ithread_mutex_unlock( &tp->mutex );
1043
1044         return ret;
1045 }
1046
1047 /****************************************************************************
1048  * Function: ThreadPoolGetAttr
1049  *
1050  *  Description:
1051  *      Gets the current set of attributes
1052  *      associated with the thread pool.
1053  *  Parameters:
1054  *      tp - valid thread pool pointer
1055  *      out - non null pointer to store attributes
1056  *  Returns:
1057  *      0 on success, nonzero on failure
1058  *      Always returns 0.
1059  *****************************************************************************/
1060 int ThreadPoolGetAttr( ThreadPool *tp, ThreadPoolAttr *out )
1061 {
1062         assert( tp != NULL );
1063         assert( out != NULL );
1064         if( tp == NULL || out == NULL ) {
1065                 return EINVAL;
1066         }
1067
1068         if( !tp->shutdown ) {
1069                 ithread_mutex_lock( &tp->mutex );
1070         }
1071
1072         *out = tp->attr;
1073
1074         if( !tp->shutdown ) {
1075                 ithread_mutex_unlock( &tp->mutex );
1076         }
1077
1078         return 0;
1079 }
1080
1081 /****************************************************************************
1082  * Function: ThreadPoolSetAttr
1083  *
1084  *  Description:
1085  *      Sets the attributes for the thread pool.
1086  *      Only affects future calculations.
1087  *  Parameters:
1088  *      tp - valid thread pool pointer
1089  *      attr - pointer to attributes, null sets attributes to default.
1090  *  Returns:
1091  *      0 on success, nonzero on failure
1092  *      Returns INVALID_POLICY if policy can not be set.
1093  *****************************************************************************/
1094 int ThreadPoolSetAttr( ThreadPool *tp, ThreadPoolAttr *attr )
1095 {
1096         int retCode = 0;
1097         ThreadPoolAttr temp;
1098         int i = 0;
1099
1100         assert( tp != NULL );
1101         if( tp == NULL ) {
1102                 return EINVAL;
1103         }
1104         ithread_mutex_lock( &tp->mutex );
1105
1106         if( attr != NULL ) {
1107                 temp = ( *attr );
1108         } else {
1109                 TPAttrInit( &temp );
1110         }
1111
1112         if( SetPolicyType( temp.schedPolicy ) != 0 ) {
1113                 ithread_mutex_unlock( &tp->mutex );
1114
1115                 return INVALID_POLICY;
1116         }
1117
1118         tp->attr = ( temp );
1119
1120         // add threads
1121         if( tp->totalThreads < tp->attr.minThreads )
1122         {
1123                 for( i = tp->totalThreads; i < tp->attr.minThreads; i++ ) {
1124                         if( ( retCode = CreateWorker( tp ) ) != 0 ) {
1125                                 break;
1126                         }
1127                 }
1128         }
1129
1130         // signal changes 
1131         ithread_cond_signal( &tp->condition ); 
1132         ithread_mutex_unlock( &tp->mutex );
1133
1134         if( retCode != 0 ) {
1135                 // clean up if the min threads could not be created
1136                 ThreadPoolShutdown( tp );
1137         }
1138
1139         return retCode;
1140 }
1141
1142 /****************************************************************************
1143  * Function: ThreadPoolShutdown
1144  *
1145  *  Description:
1146  *      Shuts the thread pool down.
1147  *      Waits for all threads to finish.
1148  *      May block indefinitely if jobs do not
1149  *      exit.
1150  *  Parameters:
1151  *      tp - must be valid tp
1152  *  Returns:
1153  *      0 on success, nonzero on failure
1154  *      Always returns 0.
1155  *****************************************************************************/
1156 int ThreadPoolShutdown( ThreadPool *tp )
1157 {
1158         ListNode *head = NULL;
1159         ThreadPoolJob *temp = NULL;
1160
1161         assert( tp != NULL );
1162         if( tp == NULL ) {
1163                 return EINVAL;
1164         }
1165
1166         ithread_mutex_lock( &tp->mutex );
1167
1168         // clean up high priority jobs
1169         while( tp->highJobQ.size ) {
1170                 head = ListHead( &tp->highJobQ );
1171                 temp = ( ThreadPoolJob *) head->item;
1172                 if( temp->free_func ) {
1173                         temp->free_func( temp->arg );
1174                 }
1175                 FreeThreadPoolJob( tp, temp );
1176                 ListDelNode( &tp->highJobQ, head, 0 );
1177         }
1178         ListDestroy( &tp->highJobQ, 0 );
1179
1180         // clean up med priority jobs
1181         while( tp->medJobQ.size ) {
1182                 head = ListHead( &tp->medJobQ );
1183                 temp = ( ThreadPoolJob *) head->item;
1184                 if( temp->free_func ) {
1185                         temp->free_func( temp->arg );
1186                 }
1187                 FreeThreadPoolJob( tp, temp );
1188                 ListDelNode( &tp->medJobQ, head, 0 );
1189         }
1190         ListDestroy( &tp->medJobQ, 0 );
1191
1192         // clean up low priority jobs
1193                 while( tp->lowJobQ.size ) {
1194                 head = ListHead( &tp->lowJobQ );
1195                 temp = ( ThreadPoolJob *) head->item;
1196                 if( temp->free_func ) {
1197                         temp->free_func( temp->arg );
1198                 }
1199                 FreeThreadPoolJob( tp, temp );
1200                 ListDelNode( &tp->lowJobQ, head, 0 );
1201         }
1202         ListDestroy( &tp->lowJobQ, 0 );
1203
1204         // clean up long term job
1205         if( tp->persistentJob ) {
1206                 temp = tp->persistentJob;
1207                 if( temp->free_func ) {
1208                         temp->free_func( temp->arg );
1209                 }
1210                 FreeThreadPoolJob( tp, temp );
1211                 tp->persistentJob = NULL;
1212         }
1213
1214         // signal shutdown
1215         tp->shutdown = 1;
1216         ithread_cond_broadcast( &tp->condition );
1217
1218         // wait for all threads to finish
1219         while( tp->totalThreads > 0 ) {
1220                 ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
1221         }
1222
1223         // destroy condition
1224         while( ithread_cond_destroy( &tp->condition ) != 0 ) {
1225         }
1226         while( ithread_cond_destroy( &tp->start_and_shutdown ) != 0 ) {
1227         }
1228
1229         FreeListDestroy( &tp->jobFreeList );
1230
1231         ithread_mutex_unlock( &tp->mutex );
1232
1233         // destroy mutex
1234         while( ithread_mutex_destroy( &tp->mutex ) != 0 ) {
1235         }
1236
1237         return 0;
1238 }
1239
1240 /****************************************************************************
1241  * Function: TPAttrInit
1242  *
1243  *  Description:
1244  *      Initializes thread pool attributes.
1245  *      Sets values to defaults defined in ThreadPool.h.
1246  *  Parameters:
1247  *      attr - must be valid thread pool attributes.
1248  *  Returns:
1249  *      Always returns 0.
1250  *****************************************************************************/
1251 int TPAttrInit( ThreadPoolAttr *attr )
1252 {
1253         assert( attr != NULL );
1254         if( attr == NULL ) {
1255                 return EINVAL;
1256         }
1257
1258         attr->jobsPerThread  = DEFAULT_JOBS_PER_THREAD;
1259         attr->maxIdleTime    = DEFAULT_IDLE_TIME;
1260         attr->maxThreads     = DEFAULT_MAX_THREADS;
1261         attr->minThreads     = DEFAULT_MIN_THREADS;
1262         attr->schedPolicy    = DEFAULT_POLICY;
1263         attr->starvationTime = DEFAULT_STARVATION_TIME;
1264         attr->maxJobsTotal   = DEFAULT_MAX_JOBS_TOTAL;
1265
1266         return 0;
1267 }
1268
1269 /****************************************************************************
1270  * Function: TPJobInit
1271  *
1272  *  Description:
1273  *      Initializes thread pool job.
1274  *      Sets the priority to default defined in ThreadPool.h.
1275  *      Sets the free_routine to default defined in ThreadPool.h
1276  *  Parameters:
1277  *      ThreadPoolJob *job - must be valid thread pool attributes.
1278  *      start_routine func - function to run, must be valid
1279  *      void * arg - argument to pass to function.
1280  *  Returns:
1281  *      Always returns 0.
1282  *****************************************************************************/
1283 int TPJobInit( ThreadPoolJob *job, start_routine func, void *arg )
1284 {
1285         assert( job != NULL );
1286         assert( func != NULL );
1287         if( job == NULL || func == NULL ) {
1288                 return EINVAL;
1289         }
1290
1291         job->func = func;
1292         job->arg = arg;
1293         job->priority = DEFAULT_PRIORITY;
1294         job->free_func = DEFAULT_FREE_ROUTINE;
1295
1296         return 0;
1297 }
1298
1299 /****************************************************************************
1300  * Function: TPJobSetPriority
1301  *
1302  *  Description:
1303  *      Sets the max threads for the thread pool attributes.
1304  *  Parameters:
1305  *      attr - must be valid thread pool attributes.
1306  *      maxThreads - value to set
1307  *  Returns:
1308  *      Returns 0 on success nonzero on failure.
1309  *      Returns EINVAL if invalid priority.
1310  *****************************************************************************/
1311 int TPJobSetPriority(ThreadPoolJob *job, ThreadPriority priority )
1312 {
1313         assert( job != NULL );
1314         if( job == NULL ) {
1315                 return EINVAL;
1316         }
1317
1318         if( priority == LOW_PRIORITY ||
1319             priority == MED_PRIORITY ||
1320             priority == HIGH_PRIORITY ) {
1321                 job->priority = priority;
1322                 return 0;
1323         } else {
1324                 return EINVAL;
1325         }
1326 }
1327
1328 /****************************************************************************
1329  * Function: TPJobSetFreeFunction
1330  *
1331  *  Description:
1332  *      Sets the max threads for the thread pool attributes.
1333  *  Parameters:
1334  *      attr - must be valid thread pool attributes.
1335  *      maxThreads - value to set
1336  *  Returns:
1337  *      Always returns 0.
1338  *****************************************************************************/
1339 int TPJobSetFreeFunction( ThreadPoolJob *job, free_routine func )
1340 {
1341         assert( job != NULL );
1342         if( job == NULL ) {
1343                 return EINVAL;
1344         }
1345
1346         job->free_func = func;
1347
1348         return 0;
1349 }
1350
1351 /****************************************************************************
1352  * Function: TPAttrSetMaxThreads
1353  *
1354  *  Description:
1355  *      Sets the max threads for the thread pool attributes.
1356  *  Parameters:
1357  *      attr - must be valid thread pool attributes.
1358  *      maxThreads - value to set
1359  *  Returns:
1360  *      Always returns 0.
1361  *****************************************************************************/
1362 int TPAttrSetMaxThreads( ThreadPoolAttr *attr, int maxThreads )
1363 {
1364         assert( attr != NULL );
1365         if( attr == NULL ) {
1366                 return EINVAL;
1367         }
1368
1369         attr->maxThreads = maxThreads;
1370
1371         return 0;
1372 }
1373
1374 /****************************************************************************
1375  * Function: TPAttrSetMinThreads
1376  *
1377  *  Description:
1378  *      Sets the min threads for the thread pool attributes.
1379  *  Parameters:
1380  *      attr - must be valid thread pool attributes.
1381  *      minThreads - value to set
1382  *  Returns:
1383  *      Always returns 0.
1384  *****************************************************************************/
1385 int TPAttrSetMinThreads( ThreadPoolAttr *attr, int minThreads )
1386 {
1387         assert( attr != NULL );
1388         if( attr == NULL ) {
1389                 return EINVAL;
1390         }
1391
1392         attr->minThreads = minThreads;
1393
1394         return 0;
1395 }
1396
1397 /****************************************************************************
1398  * Function: TPAttrSetIdleTime
1399  *
1400  *  Description:
1401  *      Sets the idle time for the thread pool attributes.
1402  *  Parameters:
1403  *      attr - must be valid thread pool attributes.
1404  *  Returns:
1405  *      Always returns 0.
1406  *****************************************************************************/
1407 int TPAttrSetIdleTime( ThreadPoolAttr *attr, int idleTime )
1408 {
1409         assert( attr != NULL );
1410         if( attr == NULL ) {
1411                 return EINVAL;
1412         }
1413
1414         attr->maxIdleTime = idleTime;
1415
1416         return 0;
1417 }
1418
1419 /****************************************************************************
1420  * Function: TPAttrSetJobsPerThread
1421  *
1422  *  Description:
1423  *      Sets the max thre
1424  *  Parameters:
1425  *      attr - must be valid thread pool attributes.
1426  *  Returns:
1427  *      Always returns 0.
1428  *****************************************************************************/
1429 int TPAttrSetJobsPerThread( ThreadPoolAttr *attr, int jobsPerThread )
1430 {
1431         assert( attr != NULL );
1432         if( attr == NULL ) {
1433                 return EINVAL;
1434         }
1435
1436         attr->jobsPerThread = jobsPerThread;
1437
1438         return 0;
1439 }
1440
1441 /****************************************************************************
1442  * Function: TPAttrSetStarvationTime
1443  *
1444  *  Description:
1445  *      Sets the starvation time for the thread pool attributes.
1446  *  Parameters:
1447  *      attr - must be valid thread pool attributes.
1448  *  Returns:
1449  *      Always returns 0.
1450  *****************************************************************************/
1451 int TPAttrSetStarvationTime( ThreadPoolAttr *attr, int starvationTime )
1452 {
1453         assert( attr != NULL );
1454         if( attr == NULL ) {
1455                 return EINVAL;
1456         }
1457
1458         attr->starvationTime = starvationTime;
1459
1460         return 0;
1461 }
1462
1463 /****************************************************************************
1464  * Function: TPAttrSetSchedPolicy
1465  *
1466  *  Description:
1467  *      Sets the scheduling policy for the thread pool attributes.
1468  *  Parameters:
1469  *      attr - must be valid thread pool attributes.
1470  *      PolicyType schedPolicy - must be a valid policy type.
1471  *  Returns:
1472  *      Always returns 0.
1473  *****************************************************************************/
1474 int TPAttrSetSchedPolicy( ThreadPoolAttr *attr, PolicyType schedPolicy )
1475 {
1476         assert( attr != NULL );
1477         if( attr == NULL ) {
1478                 return EINVAL;
1479         }
1480
1481         attr->schedPolicy = schedPolicy;
1482
1483         return 0;
1484 }
1485
1486 /****************************************************************************
1487  * Function: TPAttrSetMaxJobsTotal
1488  *
1489  *  Description:
1490  *      Sets the maximum number jobs that can be qeued totally.
1491  *  Parameters:
1492  *      attr - must be valid thread pool attributes.
1493  *      maxJobsTotal - maximum number of jobs
1494  *  Returns:
1495  *      Always returns 0.
1496  *****************************************************************************/
1497 int TPAttrSetMaxJobsTotal( ThreadPoolAttr *attr, int  maxJobsTotal )
1498 {
1499         assert( attr != NULL );
1500         if( attr == NULL ) {
1501                 return EINVAL;
1502         }
1503
1504         attr->maxJobsTotal = maxJobsTotal;
1505
1506         return 0;
1507 }
1508
1509 #ifdef STATS
1510 void ThreadPoolPrintStats(ThreadPoolStats *stats)
1511 {
1512         assert( stats != NULL );
1513         if (stats == NULL) {
1514                 return;
1515         }
1516
1517 #ifdef __FreeBSD__
1518         printf("ThreadPoolStats at Time: %d\n", StatsTime(NULL));
1519 #else /* __FreeBSD__ */
1520         printf("ThreadPoolStats at Time: %ld\n", StatsTime(NULL));
1521 #endif /* __FreeBSD__ */
1522         printf("High Jobs pending: %d\n", stats->currentJobsHQ);
1523         printf("Med Jobs Pending: %d\n", stats->currentJobsMQ);
1524         printf("Low Jobs Pending: %d\n", stats->currentJobsLQ);
1525         printf("Average Wait in High Priority Q in milliseconds: %f\n", stats->avgWaitHQ);
1526         printf("Average Wait in Med Priority Q in milliseconds: %f\n", stats->avgWaitMQ);
1527         printf("Averate Wait in Low Priority Q in milliseconds: %f\n", stats->avgWaitLQ);
1528         printf("Max Threads Active: %d\n", stats->maxThreads);
1529         printf("Current Worker Threads: %d\n", stats->workerThreads);
1530         printf("Current Persistent Threads: %d\n", stats->persistentThreads);
1531         printf("Current Idle Threads: %d\n", stats->idleThreads);
1532         printf("Total Threads : %d\n", stats->totalThreads);
1533         printf("Total Time spent Working in seconds: %f\n", stats->totalWorkTime);
1534         printf("Total Time spent Idle in seconds : %f\n", stats->totalIdleTime);
1535 }
1536 #endif /* STATS */
1537
1538 /****************************************************************************
1539  * Function: ThreadPoolGetStats
1540  *
1541  *  Description:
1542  *      Returns various statistics about the
1543  *      thread pool.
1544  *      Only valid if STATS has been defined.
1545  *  Parameters:
1546  *      ThreadPool *tp - valid initialized threadpool
1547  *      ThreadPoolStats *stats - valid stats, out parameter
1548  *  Returns:
1549  *      Always returns 0.
1550  *****************************************************************************/
1551 #ifdef STATS
1552 int ThreadPoolGetStats( ThreadPool *tp, ThreadPoolStats *stats )
1553 {
1554         assert(tp != NULL);
1555         assert(stats != NULL);
1556         if (tp == NULL || stats == NULL) {
1557                 return EINVAL;
1558         }
1559
1560         //if not shutdown then acquire mutex
1561         if (!tp->shutdown) {
1562                 ithread_mutex_lock(&tp->mutex);
1563         }
1564
1565         *stats = tp->stats;
1566         if (stats->totalJobsHQ > 0) {
1567                 stats->avgWaitHQ = stats->totalTimeHQ / stats->totalJobsHQ;
1568         } else {
1569                 stats->avgWaitHQ = 0;
1570         }
1571         
1572         if( stats->totalJobsMQ > 0 ) {
1573                 stats->avgWaitMQ = stats->totalTimeMQ / stats->totalJobsMQ;
1574         } else {
1575                 stats->avgWaitMQ = 0;
1576         }
1577         
1578         if( stats->totalJobsLQ > 0 ) {
1579                 stats->avgWaitLQ = stats->totalTimeLQ / stats->totalJobsLQ;
1580         } else {
1581                 stats->avgWaitLQ = 0;
1582         }
1583
1584         stats->totalThreads = tp->totalThreads;
1585         stats->persistentThreads = tp->persistentThreads;
1586         stats->currentJobsHQ = ListSize( &tp->highJobQ );
1587         stats->currentJobsLQ = ListSize( &tp->lowJobQ );
1588         stats->currentJobsMQ = ListSize( &tp->medJobQ );
1589
1590         //if not shutdown then release mutex
1591         if( !tp->shutdown ) {
1592                 ithread_mutex_unlock( &tp->mutex );
1593         }
1594
1595         return 0;
1596 }
1597
1598 #endif /* STATS */
1599
1600 #ifdef WIN32
1601 #if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
1602         #define DELTA_EPOCH_IN_MICROSECS  11644473600000000Ui64
1603 #else
1604         #define DELTA_EPOCH_IN_MICROSECS  11644473600000000ULL
1605 #endif
1606  
1607 int gettimeofday(struct timeval *tv, struct timezone *tz)
1608 {
1609     FILETIME ft;
1610     unsigned __int64 tmpres = 0;
1611     static int tzflag;
1612
1613     if (NULL != tv)
1614     {
1615         GetSystemTimeAsFileTime(&ft);
1616
1617         tmpres |= ft.dwHighDateTime;
1618         tmpres <<= 32;
1619         tmpres |= ft.dwLowDateTime;
1620
1621         /*converting file time to unix epoch*/
1622         tmpres /= 10;  /*convert into microseconds*/
1623         tmpres -= DELTA_EPOCH_IN_MICROSECS; 
1624         tv->tv_sec = (long)(tmpres / 1000000UL);
1625         tv->tv_usec = (long)(tmpres % 1000000UL);
1626     }
1627
1628     if (NULL != tz)
1629     {
1630         if (!tzflag)
1631         {
1632             _tzset();
1633             tzflag++;
1634         }
1635         tz->tz_minuteswest = _timezone / 60;
1636         tz->tz_dsttime = _daylight;
1637     }
1638
1639     return 0;
1640 }
1641 #endif /* WIN32 */