source: trunk/FACT++/dim/src/dtq.c@ 14580

Last change on this file since 14580 was 14575, checked in by tbretz, 12 years ago
Updated to v20r01.
File size: 15.6 KB
Line 
1/*
2 * DTQ (Delphi Timer Queue) implements the action scheduling for the DIM
3 * (Delphi Information Managment) System.
4 * It will be used by servers clients and the Name Server.
5 *
6 * Started date : 10-11-91
7 * Written by : C. Gaspar
8 * UNIX adjustment: G.C. Ballintijn
9 */
10
11/* include files */
12#include <signal.h>
13#include <stdio.h>
14#define DIMLIB
15#include <dim.h>
16
17#ifdef VxWorks
18#include <time.h>
19#endif
20
21#include <sys/timeb.h>
22
23/* global definitions */
24#define MAX_TIMER_QUEUES 16 /* Number of normal queue's */
25#define SPECIAL_QUEUE 16 /* The queue for the queue-less */
26#define WRITE_QUEUE 17
27
28_DIM_PROTO( static void alrm_sig_handler, (int num) );
29_DIM_PROTO( static void Std_timer_handler, () );
30_DIM_PROTO( static int stop_it, (int new_time) );
31_DIM_PROTO( static int start_it, (int new_time) );
32_DIM_PROTO( static int scan_it, () );
33_DIM_PROTO( static int get_minimum, (int deltat) );
34_DIM_PROTO( int dtq_task, (void *dummy) );
35_DIM_PROTO( static int my_alarm, (int secs) );
36_DIM_PROTO( int dim_dtq_init, (int thr_flag) );
37#ifndef WIN32
38_DIM_PROTO( static void dummy_alrm_sig_handler, (int num) );
39#endif
40
41typedef struct {
42 TIMR_ENT *queue_head;
43 int remove_entries;
44} QUEUE_ENT;
45
46
47static QUEUE_ENT timer_queues[MAX_TIMER_QUEUES + 2] = {
48 {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0},
49 {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}
50};
51
52static int Inside_ast = 0;
53static int Alarm_runs = 0;
54static int sigvec_done = 0;
55
56#ifdef VxWorks
57static timer_t Timer_id;
58#endif
59
60static time_t DIM_last_time = 0;
61static int DIM_last_time_millies = 0;
62static int DIM_next_time = 0;
63static int DIM_time_left = 0;
64static int Threads_off = 0;
65
66/*
67 * DTQ routines
68 */
69
70
71#ifndef WIN32
72
73void dim_no_threads()
74{
75 extern void dic_no_threads();
76 extern void dis_no_threads();
77
78 Threads_off = 1;
79 dic_no_threads();
80 dis_no_threads();
81}
82
83int dim_dtq_init(int thr_flag)
84{
85struct sigaction sig_info;
86sigset_t set;
87int ret = 0;
88
89/*
90 pid = getpid();
91*/
92 if( !sigvec_done)
93 {
94 Inside_ast = 0;
95 Alarm_runs = 0;
96 DIM_last_time = 0;
97/*
98 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
99 {
100 timer_queues[i].queue_head = 0;
101 timer_queues[i].remove_entries = 0;
102 }
103*/
104 if( timer_queues[SPECIAL_QUEUE].queue_head == NULL ) {
105 timer_queues[SPECIAL_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
106 memset(timer_queues[SPECIAL_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
107 dll_init( (DLL *)timer_queues[SPECIAL_QUEUE].queue_head);
108 }
109 if( timer_queues[WRITE_QUEUE].queue_head == NULL ) {
110 timer_queues[WRITE_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
111 memset(timer_queues[WRITE_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
112 dll_init( (DLL *)timer_queues[WRITE_QUEUE].queue_head);
113 }
114 if(!thr_flag)
115 {
116 Threads_off = 1;
117 }
118 sigemptyset(&set);
119
120 sigaddset(&set,SIGIO);
121
122 if(thr_flag)
123 sig_info.sa_handler = dummy_alrm_sig_handler;
124 else
125 sig_info.sa_handler = alrm_sig_handler;
126 sig_info.sa_mask = set;
127#ifndef LYNXOS
128 sig_info.sa_flags = SA_RESTART;
129#else
130 sig_info.sa_flags = 0;
131#endif
132 if( sigaction(SIGALRM, &sig_info, 0) < 0 ) {
133 perror( "sigaction(SIGALRM)" );
134 exit(1);
135 }
136
137 sigvec_done = 1;
138 ret = 1;
139 }
140 return(ret);
141}
142
143void dummy_alrm_sig_handler( int num )
144{
145 if(num){}
146}
147
148#else
149
150int dim_dtq_init(int thr_flag)
151{
152 int tid = 1;
153 void create_alrm_thread(void);
154
155 if( !sigvec_done ) {
156 Inside_ast = 0;
157 Alarm_runs = 0;
158 DIM_last_time = 0;
159/*
160 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
161 {
162 timer_queues[i].queue_head = 0;
163 timer_queues[i].remove_entries = 0;
164 }
165*/
166 if( timer_queues[SPECIAL_QUEUE].queue_head == NULL ) {
167 timer_queues[SPECIAL_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
168 memset(timer_queues[SPECIAL_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
169 dll_init( (DLL *)timer_queues[SPECIAL_QUEUE].queue_head);
170 }
171 if( timer_queues[WRITE_QUEUE].queue_head == NULL ) {
172 timer_queues[WRITE_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
173 memset(timer_queues[WRITE_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
174 dll_init( (DLL *)timer_queues[WRITE_QUEUE].queue_head);
175 }
176/*
177#ifndef STDCALL
178 tid = _beginthread((void *)(void *)dtq_task,0,NULL);
179#else
180 tid = _beginthreadex(NULL, NULL,
181 dtq_task,0,0,NULL);
182#endif
183*/
184 create_alrm_thread();
185 sigvec_done = 1;
186 }
187 return(tid);
188}
189
190#endif
191
192void dim_dtq_stop()
193{
194/*
195 int i;
196 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
197 {
198 if( timer_queues[i].queue_head != NULL)
199 {
200 dtq_delete(i);
201 free((TIMR_ENT *)timer_queues[i].queue_head);
202 timer_queues[i].queue_head = 0;
203 }
204 }
205*/
206 scan_it();
207 if( timer_queues[WRITE_QUEUE].queue_head != NULL)
208 {
209 dtq_delete(WRITE_QUEUE);
210 free((TIMR_ENT *)timer_queues[WRITE_QUEUE].queue_head);
211 timer_queues[WRITE_QUEUE].queue_head = 0;
212 }
213 sigvec_done = 0;
214}
215
216static int get_current_time(int *millies)
217{
218 int secs;
219#ifdef WIN32
220 struct timeb timebuf;
221#else
222 struct timeval tv;
223 struct timezone *tz;
224#endif
225
226#ifdef WIN32
227 ftime(&timebuf);
228 secs = (int)timebuf.time;
229 *millies = timebuf.millitm;
230#else
231 tz = 0;
232 gettimeofday(&tv, tz);
233 secs = tv.tv_sec;
234 *millies = tv.tv_usec / 1000;
235#endif
236 return secs;
237}
238
239static int get_elapsed_time()
240{
241 int millies, deltat;
242 int now;
243
244 now = get_current_time(&millies);
245 deltat = now - (int)DIM_last_time;
246 if((millies + 50) < DIM_last_time_millies)
247 {
248 deltat --;
249 }
250 return deltat;
251}
252
253static int my_alarm(int secs)
254{
255 int ret;
256
257 DIM_next_time = secs;
258#ifndef WIN32
259 if(Threads_off)
260 {
261 if( secs < 0)
262 {
263 kill(getpid(),SIGALRM);
264 return(0);
265 }
266 else
267 {
268 return(alarm(secs));
269 }
270 }
271 else
272 {
273#endif
274
275 ret = DIM_time_left;
276
277 if(secs == 0)
278 DIM_next_time = -1;
279 return(ret);
280#ifndef WIN32
281 }
282#endif
283}
284
285void dim_usleep(int usecs)
286{
287
288#ifndef WIN32
289 struct timeval timeout;
290
291 timeout.tv_sec = 0;
292 timeout.tv_usec = usecs;
293 select(FD_SETSIZE, NULL, NULL, NULL, &timeout);
294#else
295 usleep(usecs);
296#endif
297}
298
299int dtq_task(void *dummy)
300{
301int deltat;
302static int to_go;
303
304 if(dummy){}
305 while(1)
306 {
307 if(DIM_next_time)
308 {
309 DISABLE_AST
310 DIM_time_left = DIM_next_time;
311 if(DIM_time_left == -1)
312 DIM_time_left = 0;
313 to_go = DIM_next_time;
314 DIM_next_time = 0;
315 ENABLE_AST
316 }
317 if(DIM_time_left < 0)
318 {
319 DIM_time_left = 0;
320 alrm_sig_handler(2);
321#ifndef WIN32
322 return(1);
323#endif
324 }
325 else if(DIM_time_left > 0)
326 {
327 dim_usleep(100000);
328 deltat = get_elapsed_time();
329 DIM_time_left = to_go - deltat;
330 if(DIM_time_left <= 0)
331 {
332 alrm_sig_handler(2);
333#ifndef WIN32
334 return(1);
335#endif
336 }
337 }
338 else
339 {
340 dim_usleep(1000);
341 }
342 }
343}
344
345int dtq_create()
346{
347 int i;
348 extern void dim_init_threads(void);
349
350 if(!Threads_off)
351 {
352 dim_init_threads();
353 }
354 dim_dtq_init(0);
355 for( i = 1; i < MAX_TIMER_QUEUES; i++ )
356 if( timer_queues[i].queue_head == 0 )
357 break;
358
359 if( i == MAX_TIMER_QUEUES )
360 return(0);
361
362 timer_queues[i].queue_head = (TIMR_ENT *)malloc( sizeof(TIMR_ENT) );
363 memset( timer_queues[i].queue_head, 0, sizeof(TIMR_ENT) );
364 dll_init( (DLL *)timer_queues[i].queue_head);
365
366 return(i);
367}
368
369
370int dtq_delete(int queue_id)
371{
372 TIMR_ENT *queue_head, *entry;
373
374 DISABLE_AST
375 queue_head = timer_queues[queue_id].queue_head;
376 if(queue_head)
377 {
378 while(!dll_empty((DLL *)queue_head))
379 {
380 entry = queue_head->next;
381 dll_remove(entry);
382 free(entry);
383 }
384 free(queue_head);
385 timer_queues[queue_id].queue_head = 0;
386 }
387 ENABLE_AST
388 return(1);
389}
390
391TIMR_ENT *dtq_add_entry(int queue_id, int time, void (*user_routine)(), dim_long tag)
392{
393 TIMR_ENT *new_entry, *queue_head, *auxp, *prevp;
394 int next_time, min_time = 100000;
395 int time_left, deltat = 0;
396
397 DISABLE_AST
398
399 next_time = time;
400 if(!next_time)
401 next_time = -10;
402 if(Alarm_runs)
403 {
404 time_left = DIM_time_left;
405 if(!time_left)
406 time_left = DIM_next_time;
407 if((time_left > next_time) || (queue_id == SPECIAL_QUEUE))
408 {
409 if(next_time != -10)
410 {
411 min_time = stop_it();
412 if((next_time > min_time) && (min_time != 0))
413 next_time = min_time;
414 }
415 else
416 my_alarm(next_time);
417 }
418 else
419 {
420 deltat = get_elapsed_time();
421 }
422 }
423 new_entry = (TIMR_ENT *)malloc( sizeof(TIMR_ENT) );
424 new_entry->time = time;
425 if( user_routine )
426 new_entry->user_routine = user_routine;
427 else
428 new_entry->user_routine = Std_timer_handler;
429 new_entry->tag = tag;
430 new_entry->time_left = time + deltat;
431
432 queue_head = timer_queues[queue_id].queue_head;
433 if(!time)
434 {
435 dll_insert_after((DLL *)queue_head->prev, (DLL *)new_entry);
436 }
437 else
438 {
439 if(queue_head)
440 {
441 auxp = queue_head;
442 prevp = auxp;
443 while((auxp = (TIMR_ENT *)dll_get_prev((DLL *)queue_head, (DLL *)auxp)))
444 {
445 if(time >= auxp->time)
446 {
447 break;
448 }
449 prevp = auxp;
450 }
451/*
452 if(auxp)
453 {
454 if(queue_id != SPECIAL_QUEUE)
455 {
456 if(auxp->time_left > 0)
457 {
458 if(auxp->time == time)
459 new_entry->time_left = auxp->time_left;
460 }
461 }
462 prevp = auxp;
463 }
464*/
465 dll_insert_after((DLL *)prevp, (DLL *)new_entry);
466 }
467 }
468 if(!Alarm_runs)
469 {
470 if((next_time != -10) && (min_time == 100000))
471 {
472 min_time = get_minimum(0);
473 if(next_time > min_time)
474 next_time = min_time;
475 }
476 start_it(next_time);
477 }
478 ENABLE_AST
479 return(new_entry);
480}
481
482int dtq_clear_entry(TIMR_ENT *entry)
483{
484 int time_left, deltat = 0;
485
486 DISABLE_AST
487 deltat = get_elapsed_time();
488 time_left = entry->time_left - deltat;
489 entry->time_left = entry->time + deltat;
490 ENABLE_AST
491 return(time_left);
492}
493
494
495int dtq_rem_entry(int queue_id, TIMR_ENT *entry)
496{
497 int time_left, deltat = 0;
498
499 DISABLE_AST
500 deltat = get_elapsed_time();
501 time_left = entry->time_left - deltat;
502 if( Inside_ast )
503 {
504 timer_queues[queue_id].remove_entries++;
505 entry->time = -1;
506 ENABLE_AST
507 return(time_left);
508 }
509 dll_remove(entry);
510 free(entry);
511
512 ENABLE_AST
513 return(time_left);
514}
515
516static int rem_deleted_entries(int queue_id)
517{
518 TIMR_ENT *auxp, *prevp, *queue_head;
519 int n;
520
521 DISABLE_AST
522 queue_head = timer_queues[queue_id].queue_head;
523 n = timer_queues[queue_id].remove_entries;
524 if(queue_head)
525 {
526 auxp = queue_head;
527 prevp = auxp;
528 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head, (DLL *)auxp)) )
529 {
530 if(auxp->time == -1)
531 {
532 dll_remove(auxp);
533 free(auxp);
534 auxp = prevp;
535 n--;
536 if(!n)
537 break;
538 }
539 else
540 prevp = auxp;
541 }
542 }
543 ENABLE_AST;
544 return(1);
545}
546
547static int get_minimum(int deltat)
548{
549 TIMR_ENT *auxp, *queue_head;
550 int queue_id;
551 int min_time = 100000;
552
553 queue_head = timer_queues[WRITE_QUEUE].queue_head;
554 if( dll_get_next((DLL *)queue_head,(DLL *)queue_head))
555 min_time = -10;
556 if((min_time != -10) || deltat)
557 {
558 if( (queue_head = timer_queues[SPECIAL_QUEUE].queue_head) != NULL)
559 {
560 auxp = queue_head;
561 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
562 {
563 auxp->time_left -= deltat;
564 if(auxp->time_left > 0)
565 {
566 if(auxp->time_left < min_time)
567 {
568 min_time = auxp->time_left;
569 }
570 }
571 }
572 }
573 for( queue_id = 0; queue_id < MAX_TIMER_QUEUES; queue_id++ )
574 {
575 if( (queue_head = timer_queues[queue_id].queue_head) == NULL )
576 continue;
577 auxp = queue_head;
578 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
579 {
580 auxp->time_left -= deltat;
581 if(auxp->time_left > 0)
582 {
583 if(auxp->time_left < min_time)
584 {
585 min_time = auxp->time_left;
586 }
587 }
588 else
589 {
590 if(auxp->time < min_time)
591 {
592 min_time = auxp->time;
593 }
594 }
595 if((!deltat) && (min_time <= 1))
596 break;
597 }
598 }
599 }
600 if(min_time == 100000)
601 min_time = 0;
602 return min_time;
603}
604
605static int stop_it()
606{
607 int min_time;
608 int deltat = 0;
609
610 DISABLE_AST
611 if(Alarm_runs)
612 {
613 my_alarm(0);
614 deltat = get_elapsed_time();
615 if(deltat != 0)
616 DIM_last_time = get_current_time(&DIM_last_time_millies);
617 Alarm_runs = 0;
618 }
619 min_time = get_minimum(deltat);
620 ENABLE_AST
621 return(min_time);
622}
623
624static int start_it(int new_time)
625{
626 int next_time;
627 TIMR_ENT *queue_head;
628
629 DISABLE_AST
630 next_time = new_time;
631 if(next_time > 0)
632 {
633 queue_head = timer_queues[WRITE_QUEUE].queue_head;
634 if( dll_get_next((DLL *)queue_head,(DLL *)queue_head))
635 {
636 next_time = -10;
637 }
638 }
639 if(next_time)
640 {
641 my_alarm(next_time);
642 Alarm_runs = 1;
643 if(!DIM_last_time)
644 DIM_last_time = get_current_time(&DIM_last_time_millies);
645 }
646 else
647 DIM_last_time = 0;
648
649 ENABLE_AST
650 return(1);
651}
652
653static int scan_it()
654{
655 int queue_id, i, n = 0;
656 static int curr_queue_id = 0;
657 static TIMR_ENT *curr_entry = 0;
658 TIMR_ENT *auxp, *prevp, *queue_head;
659 TIMR_ENT *done[1024];
660
661 DISABLE_AST
662 queue_head = timer_queues[WRITE_QUEUE].queue_head;
663 if(!queue_head)
664 {
665 ENABLE_AST
666 return(0);
667 }
668 auxp = queue_head;
669 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
670 {
671 done[n++] = auxp;
672 if(n == 1000)
673 break;
674 }
675 ENABLE_AST
676 for(i = 0; i < n; i++)
677 {
678 auxp = done[i];
679 auxp->user_routine( auxp->tag );
680 }
681 {
682 DISABLE_AST
683 for(i = 0; i < n; i++)
684 {
685 auxp = done[i];
686 dll_remove(auxp);
687 free(auxp);
688 }
689 if(n == 1000)
690 {
691 ENABLE_AST
692 return(1);
693 }
694 ENABLE_AST
695 }
696 {
697 DISABLE_AST
698 queue_head = timer_queues[SPECIAL_QUEUE].queue_head;
699 auxp = queue_head;
700 prevp = auxp;
701 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
702 {
703 if(auxp->time_left <= 0)
704 {
705 dll_remove(auxp);
706 auxp->user_routine( auxp->tag );
707 free(auxp);
708 auxp = prevp;
709 n++;
710 if(n == 100)
711 {
712 ENABLE_AST
713 return(1);
714 }
715 }
716 else
717 prevp = auxp;
718 }
719 for( queue_id = curr_queue_id; queue_id < MAX_TIMER_QUEUES; queue_id++ )
720 {
721 if( (queue_head = timer_queues[queue_id].queue_head) == NULL )
722 continue;
723 Inside_ast = 1;
724 if((curr_entry) && (queue_id == curr_queue_id))
725 auxp = curr_entry;
726 else
727 auxp = queue_head;
728 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
729 {
730 if(auxp->time_left <= 0)
731 {
732 auxp->user_routine( auxp->tag );
733 auxp->time_left = auxp->time; /*restart clock*/
734 n++;
735 if(n == 100)
736 {
737 curr_queue_id = queue_id;
738 curr_entry = auxp;
739 ENABLE_AST
740 return(1);
741 }
742 }
743 }
744 Inside_ast = 0;
745 if( timer_queues[queue_id].remove_entries ) {
746 rem_deleted_entries( queue_id );
747 timer_queues[queue_id].remove_entries = 0;
748 }
749 }
750 curr_queue_id = 0;
751 curr_entry = 0;
752 ENABLE_AST
753 }
754 return(0);
755}
756
757static void alrm_sig_handler( int num)
758{
759 int next_time;
760
761 if(num){}
762 next_time = stop_it();
763 if(Threads_off)
764 {
765 if(scan_it())
766 next_time = -10;
767 }
768 else
769 {
770 while(scan_it());
771 }
772 if(!Alarm_runs)
773 {
774 start_it(next_time);
775 }
776}
777
778static void Std_timer_handler()
779{
780}
781
782void dtq_start_timer(int time, void (*user_routine)(), dim_long tag)
783{
784 extern void dim_init_threads();
785
786 if(!Threads_off)
787 {
788 dim_init_threads();
789 }
790 dim_dtq_init(0);
791 if(time != 0)
792 dtq_add_entry(SPECIAL_QUEUE, time, user_routine, tag);
793 else
794 dtq_add_entry(WRITE_QUEUE, time, user_routine, tag);
795}
796
797
798int dtq_stop_timer(dim_long tag)
799{
800 TIMR_ENT *entry, *queue_head;
801 int time_left = -1;
802
803 queue_head = timer_queues[SPECIAL_QUEUE].queue_head;
804 entry = queue_head;
805 while( (entry = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)entry)) )
806 {
807 if( entry->tag == tag )
808 {
809 time_left = dtq_rem_entry( SPECIAL_QUEUE, entry );
810 break;
811 }
812 }
813 return(time_left);
814}
815
816static int Dtq_sleeping = 0;
817
818void dtq_sleep_rout(dim_long tag)
819{
820 if(tag){}
821 Dtq_sleeping = 0;
822#ifdef WIN32
823 wake_up();
824#endif
825}
826
827#ifndef WIN32
828
829unsigned int dtq_sleep(int secs)
830{
831
832#ifndef NOTHREADS
833 int i;
834 for(i = 0; i < secs*2; i++)
835 {
836 dim_usleep(500000);
837 }
838 return(0);
839#else
840 sigset_t set, oset;
841
842 sigemptyset(&set);
843 sigaddset(&set,SIGALRM);
844 sigprocmask(SIG_UNBLOCK, &set, &oset);
845 Dtq_sleeping = 1;
846 dtq_start_timer(secs, dtq_sleep_rout, (void *)123);
847 do{
848 pause();
849 }while(Dtq_sleeping);
850 sigprocmask(SIG_SETMASK,&oset,0);
851 return(0);
852#endif
853}
854
855#else
856
857unsigned int dtq_sleep(int secs)
858{
859 Dtq_sleeping = 1;
860 dtq_start_timer(secs, dtq_sleep_rout, 1);
861 do{
862 dim_wait();
863 }while(Dtq_sleeping);
864 return(0);
865}
866
867#endif
Note: See TracBrowser for help on using the repository browser.