source: trunk/FACT++/dim_v19r19/src/dtq.c@ 10480

Last change on this file since 10480 was 10480, checked in by tbretz, 14 years ago
New version V19r19 with some important bug fixes.
File size: 16.5 KB
Line 
1/*
2 * DTQ (Delphi Timer Queue) implements the action scheduling for the DIM
3 * (Delphi Information Managment) System.
4 * It will be used by servers clients and the Name Server.
5 *
6 * Started date : 10-11-91
7 * Written by : C. Gaspar
8 * UNIX adjustment: G.C. Ballintijn
9 */
10
11/* include files */
12#include <signal.h>
13#include <stdio.h>
14#define DIMLIB
15#include <dim.h>
16
17#ifdef VxWorks
18#include <time.h>
19#endif
20
21#include <sys/timeb.h>
22
23/* global definitions */
24#define MAX_TIMER_QUEUES 16 /* Number of normal queue's */
25#define SPECIAL_QUEUE 16 /* The queue for the queue-less */
26#define WRITE_QUEUE 17
27
28_DIM_PROTO( static void alrm_sig_handler, (int num) );
29_DIM_PROTO( static void Std_timer_handler, () );
30_DIM_PROTO( static int stop_it, (int new_time) );
31_DIM_PROTO( static int start_it, (int new_time) );
32_DIM_PROTO( static int scan_it, () );
33_DIM_PROTO( static int get_minimum, (int deltat) );
34_DIM_PROTO( int dtq_task, (void *dummy) );
35_DIM_PROTO( static int my_alarm, (int secs) );
36_DIM_PROTO( int dim_dtq_init, (int thr_flag) );
37#ifndef WIN32
38_DIM_PROTO( static void dummy_alrm_sig_handler, (int num) );
39#endif
40
41typedef struct {
42 TIMR_ENT *queue_head;
43 int remove_entries;
44} QUEUE_ENT;
45
46
47static QUEUE_ENT timer_queues[MAX_TIMER_QUEUES + 2] = {
48 {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0},
49 {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}
50};
51
52static int Inside_ast = 0;
53static int Alarm_runs = 0;
54static int sigvec_done = 0;
55
56#ifdef VxWorks
57static timer_t Timer_id;
58#endif
59
60static time_t DIM_last_time = 0;
61static int DIM_last_time_millies = 0;
62static int DIM_next_time = 0;
63static int DIM_time_left = 0;
64static int Threads_off = 0;
65
66/*
67 * DTQ routines
68 */
69
70
71#ifndef WIN32
72
73void dim_no_threads()
74{
75 extern void dic_no_threads();
76 extern void dis_no_threads();
77
78 Threads_off = 1;
79 dic_no_threads();
80 dis_no_threads();
81}
82
83int dim_dtq_init(int thr_flag)
84{
85struct sigaction sig_info;
86sigset_t set;
87int pid, ret = 0;
88
89 pid = getpid();
90 if( !sigvec_done)
91 {
92 Inside_ast = 0;
93 Alarm_runs = 0;
94 DIM_last_time = 0;
95/*
96 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
97 {
98 timer_queues[i].queue_head = 0;
99 timer_queues[i].remove_entries = 0;
100 }
101*/
102 if( timer_queues[SPECIAL_QUEUE].queue_head == NULL ) {
103 timer_queues[SPECIAL_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
104 memset(timer_queues[SPECIAL_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
105 dll_init( (DLL *)timer_queues[SPECIAL_QUEUE].queue_head);
106 }
107 if( timer_queues[WRITE_QUEUE].queue_head == NULL ) {
108 timer_queues[WRITE_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
109 memset(timer_queues[WRITE_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
110 dll_init( (DLL *)timer_queues[WRITE_QUEUE].queue_head);
111 }
112 if(!thr_flag)
113 {
114 Threads_off = 1;
115 }
116 sigemptyset(&set);
117
118 sigaddset(&set,SIGIO);
119
120 if(thr_flag)
121 sig_info.sa_handler = dummy_alrm_sig_handler;
122 else
123 sig_info.sa_handler = alrm_sig_handler;
124 sig_info.sa_mask = set;
125#ifndef LYNXOS
126 sig_info.sa_flags = SA_RESTART;
127#else
128 sig_info.sa_flags = 0;
129#endif
130 if( sigaction(SIGALRM, &sig_info, 0) < 0 ) {
131 perror( "sigaction(SIGALRM)" );
132 exit(1);
133 }
134
135 sigvec_done = 1;
136 ret = 1;
137 }
138 return(ret);
139}
140
141void dummy_alrm_sig_handler( int num )
142{
143 if(num){}
144}
145
146#else
147
148int dim_dtq_init(int thr_flag)
149{
150 int tid = 1;
151 void create_alrm_thread(void);
152
153 if( !sigvec_done ) {
154 Inside_ast = 0;
155 Alarm_runs = 0;
156 DIM_last_time = 0;
157/*
158 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
159 {
160 timer_queues[i].queue_head = 0;
161 timer_queues[i].remove_entries = 0;
162 }
163*/
164 if( timer_queues[SPECIAL_QUEUE].queue_head == NULL ) {
165 timer_queues[SPECIAL_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
166 memset(timer_queues[SPECIAL_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
167 dll_init( (DLL *)timer_queues[SPECIAL_QUEUE].queue_head);
168 }
169 if( timer_queues[WRITE_QUEUE].queue_head == NULL ) {
170 timer_queues[WRITE_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
171 memset(timer_queues[WRITE_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
172 dll_init( (DLL *)timer_queues[WRITE_QUEUE].queue_head);
173 }
174/*
175#ifndef STDCALL
176 tid = _beginthread((void *)(void *)dtq_task,0,NULL);
177#else
178 tid = _beginthreadex(NULL, NULL,
179 dtq_task,0,0,NULL);
180#endif
181*/
182 create_alrm_thread();
183 sigvec_done = 1;
184 }
185 return(tid);
186}
187
188#endif
189
190void dim_dtq_stop()
191{
192/*
193 int i;
194 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
195 {
196 if( timer_queues[i].queue_head != NULL)
197 {
198 dtq_delete(i);
199 free((TIMR_ENT *)timer_queues[i].queue_head);
200 timer_queues[i].queue_head = 0;
201 }
202 }
203*/
204 scan_it();
205 if( timer_queues[WRITE_QUEUE].queue_head != NULL)
206 {
207 dtq_delete(WRITE_QUEUE);
208 free((TIMR_ENT *)timer_queues[WRITE_QUEUE].queue_head);
209 timer_queues[WRITE_QUEUE].queue_head = 0;
210 }
211 sigvec_done = 0;
212}
213
214static int get_current_time(int *millies)
215{
216 int secs;
217#ifdef WIN32
218 struct timeb timebuf;
219#else
220 struct timeval tv;
221 struct timezone *tz;
222#endif
223
224#ifdef WIN32
225 ftime(&timebuf);
226 secs = (int)timebuf.time;
227 *millies = timebuf.millitm;
228#else
229 tz = 0;
230 gettimeofday(&tv, tz);
231 secs = tv.tv_sec;
232 *millies = tv.tv_usec / 1000;
233#endif
234 return secs;
235}
236
237static int get_elapsed_time()
238{
239 int millies, deltat;
240 int now;
241
242 now = get_current_time(&millies);
243 deltat = now - (int)DIM_last_time;
244 if((millies + 50) < DIM_last_time_millies)
245 {
246 deltat --;
247 }
248 return deltat;
249}
250
251static int my_alarm(int secs)
252{
253 int ret;
254
255 DIM_next_time = secs;
256#ifndef WIN32
257 if(Threads_off)
258 {
259 if( secs < 0)
260 {
261 kill(getpid(),SIGALRM);
262 return(0);
263 }
264 else
265 {
266 return(alarm(secs));
267 }
268 }
269 else
270 {
271#endif
272
273 ret = DIM_time_left;
274
275 if(secs == 0)
276 DIM_next_time = -1;
277 return(ret);
278#ifndef WIN32
279 }
280#endif
281}
282
283void dim_usleep(int usecs)
284{
285
286#ifndef WIN32
287 struct timeval timeout;
288
289 timeout.tv_sec = 0;
290 timeout.tv_usec = usecs;
291 select(FD_SETSIZE, NULL, NULL, NULL, &timeout);
292#else
293 usleep(usecs);
294#endif
295}
296
297int dtq_task(void *dummy)
298{
299int deltat;
300static int to_go;
301
302 if(dummy){}
303 while(1)
304 {
305 if(DIM_next_time)
306 {
307 DISABLE_AST
308 DIM_time_left = DIM_next_time;
309 if(DIM_time_left == -1)
310 DIM_time_left = 0;
311 to_go = DIM_next_time;
312 DIM_next_time = 0;
313 ENABLE_AST
314 }
315 if(DIM_time_left < 0)
316 {
317 DIM_time_left = 0;
318 alrm_sig_handler(2);
319#ifndef WIN32
320 return(1);
321#endif
322 }
323 else if(DIM_time_left > 0)
324 {
325 dim_usleep(100000);
326 deltat = get_elapsed_time();
327 DIM_time_left = to_go - deltat;
328 if(DIM_time_left <= 0)
329 {
330 alrm_sig_handler(2);
331#ifndef WIN32
332 return(1);
333#endif
334 }
335 }
336 else
337 {
338 dim_usleep(1000);
339 }
340 }
341}
342
343int dtq_create()
344{
345 int i;
346 extern void dim_init_threads(void);
347
348 if(!Threads_off)
349 {
350 dim_init_threads();
351 }
352 dim_dtq_init(0);
353 for( i = 1; i < MAX_TIMER_QUEUES; i++ )
354 if( timer_queues[i].queue_head == 0 )
355 break;
356
357 if( i == MAX_TIMER_QUEUES )
358 return(0);
359
360 timer_queues[i].queue_head = (TIMR_ENT *)malloc( sizeof(TIMR_ENT) );
361 memset( timer_queues[i].queue_head, 0, sizeof(TIMR_ENT) );
362 dll_init( (DLL *)timer_queues[i].queue_head);
363
364 return(i);
365}
366
367
368int dtq_delete(int queue_id)
369{
370 TIMR_ENT *queue_head, *entry;
371
372 DISABLE_AST
373 queue_head = timer_queues[queue_id].queue_head;
374 if(queue_head)
375 {
376 while(!dll_empty((DLL *)queue_head))
377 {
378 entry = queue_head->next;
379 dll_remove(entry);
380 free(entry);
381 }
382 free(queue_head);
383 timer_queues[queue_id].queue_head = 0;
384 }
385 ENABLE_AST
386 return(1);
387}
388
389TIMR_ENT *dtq_add_entry(int queue_id, int time, void (*user_routine)(), long tag)
390{
391 TIMR_ENT *new_entry, *queue_head, *auxp, *prevp;
392 int next_time, min_time = 100000;
393 int time_left, deltat = 0;
394
395 DISABLE_AST
396
397 next_time = time;
398 if(!next_time)
399 next_time = -10;
400 if(Alarm_runs)
401 {
402 time_left = DIM_time_left;
403 if(!time_left)
404 time_left = DIM_next_time;
405 if((time_left > next_time) || (queue_id == SPECIAL_QUEUE))
406 {
407 if(next_time != -10)
408 {
409 min_time = stop_it();
410 if((next_time > min_time) && (min_time != 0))
411 next_time = min_time;
412 }
413 else
414 my_alarm(next_time);
415 }
416 else
417 {
418 deltat = get_elapsed_time();
419 }
420 }
421 new_entry = (TIMR_ENT *)malloc( sizeof(TIMR_ENT) );
422 new_entry->time = time;
423 if( user_routine )
424 new_entry->user_routine = user_routine;
425 else
426 new_entry->user_routine = Std_timer_handler;
427 new_entry->tag = tag;
428 new_entry->time_left = time + deltat;
429
430 queue_head = timer_queues[queue_id].queue_head;
431 if(!time)
432 {
433 dll_insert_after((DLL *)queue_head->prev, (DLL *)new_entry);
434 }
435 else
436 {
437 if(queue_head)
438 {
439 auxp = queue_head;
440 prevp = auxp;
441 while((auxp = (TIMR_ENT *)dll_get_prev((DLL *)queue_head, (DLL *)auxp)))
442 {
443 if(time >= auxp->time)
444 {
445 break;
446 }
447 prevp = auxp;
448 }
449/*
450 if(auxp)
451 {
452 if(queue_id != SPECIAL_QUEUE)
453 {
454 if(auxp->time_left > 0)
455 {
456 if(auxp->time == time)
457 new_entry->time_left = auxp->time_left;
458 }
459 }
460 prevp = auxp;
461 }
462*/
463 dll_insert_after((DLL *)prevp, (DLL *)new_entry);
464 }
465 }
466 if(!Alarm_runs)
467 {
468 if((next_time != -10) && (min_time == 100000))
469 {
470 min_time = get_minimum(0);
471 if(next_time > min_time)
472 next_time = min_time;
473 }
474 start_it(next_time);
475 }
476 ENABLE_AST
477 return(new_entry);
478}
479
480int dtq_clear_entry(TIMR_ENT *entry)
481{
482 int time_left, deltat = 0;
483
484 DISABLE_AST
485 deltat = get_elapsed_time();
486 time_left = entry->time_left - deltat;
487 entry->time_left = entry->time + deltat;
488 ENABLE_AST
489 return(time_left);
490}
491
492
493int dtq_rem_entry(int queue_id, TIMR_ENT *entry)
494{
495 int time_left, deltat = 0;
496
497 DISABLE_AST
498 deltat = get_elapsed_time();
499 time_left = entry->time_left - deltat;
500 if( Inside_ast )
501 {
502 timer_queues[queue_id].remove_entries++;
503 entry->time = -1;
504 ENABLE_AST
505 return(time_left);
506 }
507 dll_remove(entry);
508 free(entry);
509
510 ENABLE_AST
511 return(time_left);
512}
513
514static int rem_deleted_entries(int queue_id)
515{
516 TIMR_ENT *auxp, *prevp, *queue_head;
517 int n;
518
519 DISABLE_AST
520 queue_head = timer_queues[queue_id].queue_head;
521 n = timer_queues[queue_id].remove_entries;
522 if(queue_head)
523 {
524 auxp = queue_head;
525 prevp = auxp;
526 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head, (DLL *)auxp)) )
527 {
528 if(auxp->time == -1)
529 {
530 dll_remove(auxp);
531 free(auxp);
532 auxp = prevp;
533 n--;
534 if(!n)
535 break;
536 }
537 else
538 prevp = auxp;
539 }
540 }
541 ENABLE_AST;
542 return(1);
543}
544
545static int get_minimum(int deltat)
546{
547 TIMR_ENT *auxp, *queue_head;
548 int queue_id;
549 int min_time = 100000;
550
551 queue_head = timer_queues[WRITE_QUEUE].queue_head;
552 if( dll_get_next((DLL *)queue_head,(DLL *)queue_head))
553 min_time = -10;
554 if((min_time != -10) || deltat)
555 {
556 if( (queue_head = timer_queues[SPECIAL_QUEUE].queue_head) != NULL)
557 {
558 auxp = queue_head;
559 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
560 {
561 auxp->time_left -= deltat;
562 if(auxp->time_left > 0)
563 {
564 if(auxp->time_left < min_time)
565 {
566 min_time = auxp->time_left;
567 }
568 }
569 }
570 }
571 for( queue_id = 0; queue_id < MAX_TIMER_QUEUES; queue_id++ )
572 {
573 if( (queue_head = timer_queues[queue_id].queue_head) == NULL )
574 continue;
575 auxp = queue_head;
576 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
577 {
578 auxp->time_left -= deltat;
579 if(auxp->time_left > 0)
580 {
581 if(auxp->time_left < min_time)
582 {
583 min_time = auxp->time_left;
584 }
585 }
586 else
587 {
588 if(auxp->time < min_time)
589 {
590 min_time = auxp->time;
591 }
592 }
593 if((!deltat) && (min_time <= 1))
594 break;
595 }
596 }
597 }
598 if(min_time == 100000)
599 min_time = 0;
600 return min_time;
601}
602
603static int stop_it()
604{
605 int min_time;
606 int deltat = 0;
607
608 DISABLE_AST
609 if(Alarm_runs)
610 {
611 my_alarm(0);
612 deltat = get_elapsed_time();
613 if(deltat != 0)
614 DIM_last_time = get_current_time(&DIM_last_time_millies);
615 Alarm_runs = 0;
616 }
617 min_time = get_minimum(deltat);
618 ENABLE_AST
619 return(min_time);
620}
621
622static int start_it(int new_time)
623{
624 int next_time;
625 TIMR_ENT *queue_head;
626
627 DISABLE_AST
628 next_time = new_time;
629 if(next_time > 0)
630 {
631 queue_head = timer_queues[WRITE_QUEUE].queue_head;
632 if( dll_get_next((DLL *)queue_head,(DLL *)queue_head))
633 {
634 next_time = -10;
635 }
636 }
637 if(next_time)
638 {
639 my_alarm(next_time);
640 Alarm_runs = 1;
641 if(!DIM_last_time)
642 DIM_last_time = get_current_time(&DIM_last_time_millies);
643 }
644 else
645 DIM_last_time = 0;
646
647 ENABLE_AST
648 return(1);
649}
650
651static int scan_it()
652{
653 int queue_id, i, n = 0;
654 static int curr_queue_id = 0;
655 static TIMR_ENT *curr_entry = 0;
656 TIMR_ENT *auxp, *prevp, *queue_head;
657 TIMR_ENT *done[1024];
658
659 DISABLE_AST
660 queue_head = timer_queues[WRITE_QUEUE].queue_head;
661 if(!queue_head)
662 {
663 ENABLE_AST
664 return(0);
665 }
666 auxp = queue_head;
667 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
668 {
669 done[n++] = auxp;
670 if(n == 1000)
671 break;
672 }
673 ENABLE_AST
674 for(i = 0; i < n; i++)
675 {
676 auxp = done[i];
677 auxp->user_routine( auxp->tag );
678 }
679 {
680 DISABLE_AST
681 for(i = 0; i < n; i++)
682 {
683 auxp = done[i];
684 dll_remove(auxp);
685 free(auxp);
686 }
687 if(n == 1000)
688 {
689 ENABLE_AST
690 return(1);
691 }
692 ENABLE_AST
693 }
694 {
695 DISABLE_AST
696 queue_head = timer_queues[SPECIAL_QUEUE].queue_head;
697 auxp = queue_head;
698 prevp = auxp;
699 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
700 {
701 if(auxp->time_left <= 0)
702 {
703 dll_remove(auxp);
704 auxp->user_routine( auxp->tag );
705 free(auxp);
706 auxp = prevp;
707 n++;
708 if(n == 100)
709 {
710 ENABLE_AST
711 return(1);
712 }
713 }
714 else
715 prevp = auxp;
716 }
717 for( queue_id = curr_queue_id; queue_id < MAX_TIMER_QUEUES; queue_id++ )
718 {
719 if( (queue_head = timer_queues[queue_id].queue_head) == NULL )
720 continue;
721 Inside_ast = 1;
722 if((curr_entry) && (queue_id == curr_queue_id))
723 auxp = curr_entry;
724 else
725 auxp = queue_head;
726 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
727 {
728 if(auxp->time_left <= 0)
729 {
730 auxp->user_routine( auxp->tag );
731 auxp->time_left = auxp->time; /*restart clock*/
732 n++;
733 if(n == 100)
734 {
735 curr_queue_id = queue_id;
736 curr_entry = auxp;
737 ENABLE_AST
738 return(1);
739 }
740 }
741 }
742 Inside_ast = 0;
743 if( timer_queues[queue_id].remove_entries ) {
744 rem_deleted_entries( queue_id );
745 timer_queues[queue_id].remove_entries = 0;
746 }
747 }
748 curr_queue_id = 0;
749 curr_entry = 0;
750 ENABLE_AST
751 }
752 return(0);
753}
754
755static void alrm_sig_handler( int num)
756{
757 int next_time;
758
759 if(num){}
760 next_time = stop_it();
761 if(Threads_off)
762 {
763 if(scan_it())
764 next_time = -10;
765 }
766 else
767 {
768 while(scan_it());
769 }
770 if(!Alarm_runs)
771 {
772 start_it(next_time);
773 }
774}
775
776static void Std_timer_handler()
777{
778}
779
780void dtq_start_timer(int time, void (*user_routine)(), long tag)
781{
782 extern void dim_init_threads();
783
784 if(!Threads_off)
785 {
786 dim_init_threads();
787 }
788 dim_dtq_init(0);
789 if(time != 0)
790 dtq_add_entry(SPECIAL_QUEUE, time, user_routine, tag);
791 else
792 dtq_add_entry(WRITE_QUEUE, time, user_routine, tag);
793}
794
795
796int dtq_stop_timer(long tag)
797{
798 TIMR_ENT *entry, *queue_head, *prevp;
799 int time_left = -1;
800
801 queue_head = timer_queues[SPECIAL_QUEUE].queue_head;
802 entry = queue_head;
803 prevp = entry;
804 while( (entry = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)entry)) )
805 {
806 if( entry->tag == tag )
807 {
808 time_left = dtq_rem_entry( SPECIAL_QUEUE, entry );
809 break;
810 }
811 }
812 return(time_left);
813}
814
815static int Dtq_sleeping = 0;
816
817void dtq_sleep_rout(long tag)
818{
819 if(tag){}
820 Dtq_sleeping = 0;
821#ifdef WIN32
822 wake_up();
823#endif
824}
825
826#ifndef WIN32
827
828unsigned int dtq_sleep(int secs)
829{
830
831#ifndef NOTHREADS
832 int i;
833 for(i = 0; i < secs*2; i++)
834 {
835 dim_usleep(500000);
836 }
837 return(0);
838#else
839 sigset_t set, oset;
840
841 sigemptyset(&set);
842 sigaddset(&set,SIGALRM);
843 sigprocmask(SIG_UNBLOCK, &set, &oset);
844 Dtq_sleeping = 1;
845 dtq_start_timer(secs, dtq_sleep_rout, (void *)123);
846 do{
847 pause();
848 }while(Dtq_sleeping);
849 sigprocmask(SIG_SETMASK,&oset,0);
850 return(0);
851#endif
852}
853
854#else
855
856unsigned int dtq_sleep(int secs)
857{
858 Dtq_sleeping = 1;
859 dtq_start_timer(secs, dtq_sleep_rout, 1);
860 do{
861 dim_wait();
862 }while(Dtq_sleeping);
863 return(0);
864}
865
866#endif
Note: See TracBrowser for help on using the repository browser.