source: trunk/FACT++/dim/src/dtq.c@ 17010

Last change on this file since 17010 was 15282, checked in by tbretz, 12 years ago
Updated to v20r7.
File size: 15.8 KB
Line 
1/*
2 * DTQ (Delphi Timer Queue) implements the action scheduling for the DIM
3 * (Delphi Information Managment) System.
4 * It will be used by servers clients and the Name Server.
5 *
6 * Started date : 10-11-91
7 * Written by : C. Gaspar
8 * UNIX adjustment: G.C. Ballintijn
9 */
10
11/* include files */
12#ifndef WIN32
13#ifndef NOTHREADS
14int DIM_Threads_OFF = 0;
15#else
16int DIM_Threads_OFF = 1;
17#endif
18#endif
19#include <signal.h>
20#include <stdio.h>
21#define DIMLIB
22#include <dim.h>
23
24#ifdef VxWorks
25#include <time.h>
26#endif
27
28#include <sys/timeb.h>
29
30/* global definitions */
31#define MAX_TIMER_QUEUES 16 /* Number of normal queue's */
32#define SPECIAL_QUEUE 16 /* The queue for the queue-less */
33#define WRITE_QUEUE 17
34
35_DIM_PROTO( static void alrm_sig_handler, (int num) );
36_DIM_PROTO( static void Std_timer_handler, () );
37_DIM_PROTO( static int stop_it, (int new_time) );
38_DIM_PROTO( static int start_it, (int new_time) );
39_DIM_PROTO( static int scan_it, () );
40_DIM_PROTO( static int get_minimum, (int deltat) );
41_DIM_PROTO( int dtq_task, (void *dummy) );
42_DIM_PROTO( static int my_alarm, (int secs) );
43_DIM_PROTO( int dim_dtq_init, (int thr_flag) );
44#ifndef WIN32
45_DIM_PROTO( static void dummy_alrm_sig_handler, (int num) );
46#endif
47
48typedef struct {
49 TIMR_ENT *queue_head;
50 int remove_entries;
51} QUEUE_ENT;
52
53
54static QUEUE_ENT timer_queues[MAX_TIMER_QUEUES + 2] = {
55 {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0},
56 {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}
57};
58
59static int Inside_ast = 0;
60static int Alarm_runs = 0;
61static int sigvec_done = 0;
62
63#ifdef VxWorks
64static timer_t Timer_id;
65#endif
66
67static time_t DIM_last_time = 0;
68static int DIM_last_time_millies = 0;
69static int DIM_next_time = 0;
70static int DIM_time_left = 0;
71static int Threads_off = 0;
72
73/*
74 * DTQ routines
75 */
76
77
78#ifndef WIN32
79
80void dim_no_threads()
81{
82 extern void dic_no_threads();
83 extern void dis_no_threads();
84
85 DIM_Threads_OFF = 1;
86 Threads_off = 1;
87 dic_no_threads();
88 dis_no_threads();
89}
90
91int dim_dtq_init(int thr_flag)
92{
93struct sigaction sig_info;
94sigset_t set;
95int ret = 0;
96
97/*
98 pid = getpid();
99*/
100 if( !sigvec_done)
101 {
102 Inside_ast = 0;
103 Alarm_runs = 0;
104 DIM_last_time = 0;
105/*
106 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
107 {
108 timer_queues[i].queue_head = 0;
109 timer_queues[i].remove_entries = 0;
110 }
111*/
112 if( timer_queues[SPECIAL_QUEUE].queue_head == NULL ) {
113 timer_queues[SPECIAL_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
114 memset(timer_queues[SPECIAL_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
115 dll_init( (DLL *)timer_queues[SPECIAL_QUEUE].queue_head);
116 }
117 if( timer_queues[WRITE_QUEUE].queue_head == NULL ) {
118 timer_queues[WRITE_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
119 memset(timer_queues[WRITE_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
120 dll_init( (DLL *)timer_queues[WRITE_QUEUE].queue_head);
121 }
122 if(!thr_flag)
123 {
124 Threads_off = 1;
125 }
126 sigemptyset(&set);
127
128 sigaddset(&set,SIGIO);
129
130 if(thr_flag)
131 sig_info.sa_handler = dummy_alrm_sig_handler;
132 else
133 sig_info.sa_handler = alrm_sig_handler;
134 sig_info.sa_mask = set;
135#ifndef LYNXOS
136 sig_info.sa_flags = SA_RESTART;
137#else
138 sig_info.sa_flags = 0;
139#endif
140 if( sigaction(SIGALRM, &sig_info, 0) < 0 ) {
141 perror( "sigaction(SIGALRM)" );
142 exit(1);
143 }
144
145 sigvec_done = 1;
146 ret = 1;
147 }
148 return(ret);
149}
150
151void dummy_alrm_sig_handler( int num )
152{
153 if(num){}
154}
155
156#else
157
158int dim_dtq_init(int thr_flag)
159{
160 int tid = 1;
161 void create_alrm_thread(void);
162
163 if( !sigvec_done ) {
164 Inside_ast = 0;
165 Alarm_runs = 0;
166 DIM_last_time = 0;
167/*
168 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
169 {
170 timer_queues[i].queue_head = 0;
171 timer_queues[i].remove_entries = 0;
172 }
173*/
174 if( timer_queues[SPECIAL_QUEUE].queue_head == NULL ) {
175 timer_queues[SPECIAL_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
176 memset(timer_queues[SPECIAL_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
177 dll_init( (DLL *)timer_queues[SPECIAL_QUEUE].queue_head);
178 }
179 if( timer_queues[WRITE_QUEUE].queue_head == NULL ) {
180 timer_queues[WRITE_QUEUE].queue_head = (TIMR_ENT *)malloc(sizeof(TIMR_ENT));
181 memset(timer_queues[WRITE_QUEUE].queue_head, 0, sizeof(TIMR_ENT));
182 dll_init( (DLL *)timer_queues[WRITE_QUEUE].queue_head);
183 }
184/*
185#ifndef STDCALL
186 tid = _beginthread((void *)(void *)dtq_task,0,NULL);
187#else
188 tid = _beginthreadex(NULL, NULL,
189 dtq_task,0,0,NULL);
190#endif
191*/
192 create_alrm_thread();
193 sigvec_done = 1;
194 }
195 return(tid);
196}
197
198#endif
199
200void dim_dtq_stop()
201{
202/*
203 int i;
204 for(i = 0; i < MAX_TIMER_QUEUES + 2; i++)
205 {
206 if( timer_queues[i].queue_head != NULL)
207 {
208 dtq_delete(i);
209 free((TIMR_ENT *)timer_queues[i].queue_head);
210 timer_queues[i].queue_head = 0;
211 }
212 }
213*/
214 scan_it();
215 if( timer_queues[WRITE_QUEUE].queue_head != NULL)
216 {
217 dtq_delete(WRITE_QUEUE);
218 free((TIMR_ENT *)timer_queues[WRITE_QUEUE].queue_head);
219 timer_queues[WRITE_QUEUE].queue_head = 0;
220 }
221 sigvec_done = 0;
222}
223
224static int get_current_time(int *millies)
225{
226 int secs;
227#ifdef WIN32
228 struct timeb timebuf;
229#else
230 struct timeval tv;
231 struct timezone *tz;
232#endif
233
234#ifdef WIN32
235 ftime(&timebuf);
236 secs = (int)timebuf.time;
237 *millies = timebuf.millitm;
238#else
239 tz = 0;
240 gettimeofday(&tv, tz);
241 secs = (int)tv.tv_sec;
242 *millies = (int)tv.tv_usec / 1000;
243#endif
244 return secs;
245}
246
247static int get_elapsed_time()
248{
249 int millies, deltat;
250 int now;
251
252 now = get_current_time(&millies);
253 deltat = now - (int)DIM_last_time;
254 if((millies + 50) < DIM_last_time_millies)
255 {
256 deltat --;
257 }
258 return deltat;
259}
260
261static int my_alarm(int secs)
262{
263 int ret;
264
265 DIM_next_time = secs;
266#ifndef WIN32
267 if(Threads_off)
268 {
269 if( secs < 0)
270 {
271 kill(getpid(),SIGALRM);
272 return(0);
273 }
274 else
275 {
276 return((int)alarm((unsigned int)secs));
277 }
278 }
279 else
280 {
281#endif
282
283 ret = DIM_time_left;
284
285 if(secs == 0)
286 DIM_next_time = -1;
287 return(ret);
288#ifndef WIN32
289 }
290#endif
291}
292
293void dim_usleep(int usecs)
294{
295
296#ifndef WIN32
297 struct timeval timeout;
298
299 timeout.tv_sec = 0;
300 timeout.tv_usec = usecs;
301 select(FD_SETSIZE, NULL, NULL, NULL, &timeout);
302#else
303 usleep(usecs);
304#endif
305}
306
307int dtq_task(void *dummy)
308{
309int deltat;
310static int to_go;
311
312 if(dummy){}
313 while(1)
314 {
315 if(DIM_next_time)
316 {
317 DISABLE_AST
318 DIM_time_left = DIM_next_time;
319 if(DIM_time_left == -1)
320 DIM_time_left = 0;
321 to_go = DIM_next_time;
322 DIM_next_time = 0;
323 ENABLE_AST
324 }
325 if(DIM_time_left < 0)
326 {
327 DIM_time_left = 0;
328 alrm_sig_handler(2);
329#ifndef WIN32
330 return(1);
331#endif
332 }
333 else if(DIM_time_left > 0)
334 {
335 dim_usleep(100000);
336 deltat = get_elapsed_time();
337 DIM_time_left = to_go - deltat;
338 if(DIM_time_left <= 0)
339 {
340 alrm_sig_handler(2);
341#ifndef WIN32
342 return(1);
343#endif
344 }
345 }
346 else
347 {
348 dim_usleep(1000);
349 }
350 }
351}
352
353int dtq_create()
354{
355 int i;
356 extern void dim_init_threads(void);
357
358 if(!Threads_off)
359 {
360 dim_init_threads();
361 }
362 dim_dtq_init(0);
363 for( i = 1; i < MAX_TIMER_QUEUES; i++ )
364 if( timer_queues[i].queue_head == 0 )
365 break;
366
367 if( i == MAX_TIMER_QUEUES )
368 return(0);
369
370 timer_queues[i].queue_head = (TIMR_ENT *)malloc( sizeof(TIMR_ENT) );
371 memset( timer_queues[i].queue_head, 0, sizeof(TIMR_ENT) );
372 dll_init( (DLL *)timer_queues[i].queue_head);
373
374 return(i);
375}
376
377
378int dtq_delete(int queue_id)
379{
380 TIMR_ENT *queue_head, *entry;
381
382 DISABLE_AST
383 queue_head = timer_queues[queue_id].queue_head;
384 if(queue_head)
385 {
386 while(!dll_empty((DLL *)queue_head))
387 {
388 entry = queue_head->next;
389 dll_remove(entry);
390 free(entry);
391 }
392 free(queue_head);
393 timer_queues[queue_id].queue_head = 0;
394 }
395 ENABLE_AST
396 return(1);
397}
398
399TIMR_ENT *dtq_add_entry(int queue_id, int time, void (*user_routine)(), dim_long tag)
400{
401 TIMR_ENT *new_entry, *queue_head, *auxp, *prevp;
402 int next_time, min_time = 100000;
403 int time_left, deltat = 0;
404
405 DISABLE_AST
406
407 next_time = time;
408 if(!next_time)
409 next_time = -10;
410 if(Alarm_runs)
411 {
412 time_left = DIM_time_left;
413 if(!time_left)
414 time_left = DIM_next_time;
415 if((time_left > next_time) || (queue_id == SPECIAL_QUEUE))
416 {
417 if(next_time != -10)
418 {
419 min_time = stop_it();
420 if((next_time > min_time) && (min_time != 0))
421 next_time = min_time;
422 }
423 else
424 my_alarm(next_time);
425 }
426 else
427 {
428 deltat = get_elapsed_time();
429 }
430 }
431 new_entry = (TIMR_ENT *)malloc( sizeof(TIMR_ENT) );
432 new_entry->time = time;
433 if( user_routine )
434 new_entry->user_routine = user_routine;
435 else
436 new_entry->user_routine = Std_timer_handler;
437 new_entry->tag = tag;
438 new_entry->time_left = time + deltat;
439
440 queue_head = timer_queues[queue_id].queue_head;
441 if(!time)
442 {
443 dll_insert_after((DLL *)queue_head->prev, (DLL *)new_entry);
444 }
445 else
446 {
447 if(queue_head)
448 {
449 auxp = queue_head;
450 prevp = auxp;
451 while((auxp = (TIMR_ENT *)dll_get_prev((DLL *)queue_head, (DLL *)auxp)))
452 {
453 if(time >= auxp->time)
454 {
455 break;
456 }
457 prevp = auxp;
458 }
459/*
460 if(auxp)
461 {
462 if(queue_id != SPECIAL_QUEUE)
463 {
464 if(auxp->time_left > 0)
465 {
466 if(auxp->time == time)
467 new_entry->time_left = auxp->time_left;
468 }
469 }
470 prevp = auxp;
471 }
472*/
473 dll_insert_after((DLL *)prevp, (DLL *)new_entry);
474 }
475 }
476 if(!Alarm_runs)
477 {
478 if((next_time != -10) && (min_time == 100000))
479 {
480 min_time = get_minimum(0);
481 if(next_time > min_time)
482 next_time = min_time;
483 }
484 start_it(next_time);
485 }
486 ENABLE_AST
487 return(new_entry);
488}
489
490int dtq_clear_entry(TIMR_ENT *entry)
491{
492 int time_left, deltat = 0;
493
494 DISABLE_AST
495 deltat = get_elapsed_time();
496 time_left = entry->time_left - deltat;
497 entry->time_left = entry->time + deltat;
498 ENABLE_AST
499 return(time_left);
500}
501
502
503int dtq_rem_entry(int queue_id, TIMR_ENT *entry)
504{
505 int time_left, deltat = 0;
506
507 DISABLE_AST
508 deltat = get_elapsed_time();
509 time_left = entry->time_left - deltat;
510 if( Inside_ast )
511 {
512 timer_queues[queue_id].remove_entries++;
513 entry->time = -1;
514 ENABLE_AST
515 return(time_left);
516 }
517 dll_remove(entry);
518 free(entry);
519
520 ENABLE_AST
521 return(time_left);
522}
523
524static int rem_deleted_entries(int queue_id)
525{
526 TIMR_ENT *auxp, *prevp, *queue_head;
527 int n;
528
529 DISABLE_AST
530 queue_head = timer_queues[queue_id].queue_head;
531 n = timer_queues[queue_id].remove_entries;
532 if(queue_head)
533 {
534 auxp = queue_head;
535 prevp = auxp;
536 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head, (DLL *)auxp)) )
537 {
538 if(auxp->time == -1)
539 {
540 dll_remove(auxp);
541 free(auxp);
542 auxp = prevp;
543 n--;
544 if(!n)
545 break;
546 }
547 else
548 prevp = auxp;
549 }
550 }
551 ENABLE_AST;
552 return(1);
553}
554
555static int get_minimum(int deltat)
556{
557 TIMR_ENT *auxp, *queue_head;
558 int queue_id;
559 int min_time = 100000;
560
561 queue_head = timer_queues[WRITE_QUEUE].queue_head;
562 if( dll_get_next((DLL *)queue_head,(DLL *)queue_head))
563 min_time = -10;
564 if((min_time != -10) || deltat)
565 {
566 if( (queue_head = timer_queues[SPECIAL_QUEUE].queue_head) != NULL)
567 {
568 auxp = queue_head;
569 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
570 {
571 auxp->time_left -= deltat;
572 if(auxp->time_left > 0)
573 {
574 if(auxp->time_left < min_time)
575 {
576 min_time = auxp->time_left;
577 }
578 }
579 }
580 }
581 for( queue_id = 0; queue_id < MAX_TIMER_QUEUES; queue_id++ )
582 {
583 if( (queue_head = timer_queues[queue_id].queue_head) == NULL )
584 continue;
585 auxp = queue_head;
586 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
587 {
588 auxp->time_left -= deltat;
589 if(auxp->time_left > 0)
590 {
591 if(auxp->time_left < min_time)
592 {
593 min_time = auxp->time_left;
594 }
595 }
596 else
597 {
598 if(auxp->time < min_time)
599 {
600 min_time = auxp->time;
601 }
602 }
603 if((!deltat) && (min_time <= 1))
604 break;
605 }
606 }
607 }
608 if(min_time == 100000)
609 min_time = 0;
610 return min_time;
611}
612
613static int stop_it()
614{
615 int min_time;
616 int deltat = 0;
617
618 DISABLE_AST
619 if(Alarm_runs)
620 {
621 my_alarm(0);
622 deltat = get_elapsed_time();
623 if(deltat != 0)
624 DIM_last_time = get_current_time(&DIM_last_time_millies);
625 Alarm_runs = 0;
626 }
627 min_time = get_minimum(deltat);
628 ENABLE_AST
629 return(min_time);
630}
631
632static int start_it(int new_time)
633{
634 int next_time;
635 TIMR_ENT *queue_head;
636
637 DISABLE_AST
638 next_time = new_time;
639 if(next_time > 0)
640 {
641 queue_head = timer_queues[WRITE_QUEUE].queue_head;
642 if( dll_get_next((DLL *)queue_head,(DLL *)queue_head))
643 {
644 next_time = -10;
645 }
646 }
647 if(next_time)
648 {
649 my_alarm(next_time);
650 Alarm_runs = 1;
651 if(!DIM_last_time)
652 DIM_last_time = get_current_time(&DIM_last_time_millies);
653 }
654 else
655 DIM_last_time = 0;
656
657 ENABLE_AST
658 return(1);
659}
660
661static int scan_it()
662{
663 int queue_id, i, n = 0;
664 static int curr_queue_id = 0;
665 static TIMR_ENT *curr_entry = 0;
666 TIMR_ENT *auxp, *prevp, *queue_head;
667 TIMR_ENT *done[1024];
668
669 DISABLE_AST
670 queue_head = timer_queues[WRITE_QUEUE].queue_head;
671 if(!queue_head)
672 {
673 ENABLE_AST
674 return(0);
675 }
676 auxp = queue_head;
677 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
678 {
679 done[n++] = auxp;
680 if(n == 1000)
681 break;
682 }
683 ENABLE_AST
684 for(i = 0; i < n; i++)
685 {
686 auxp = done[i];
687 auxp->user_routine( auxp->tag );
688 }
689 {
690 DISABLE_AST
691 for(i = 0; i < n; i++)
692 {
693 auxp = done[i];
694 dll_remove(auxp);
695 free(auxp);
696 }
697 if(n == 1000)
698 {
699 ENABLE_AST
700 return(1);
701 }
702 ENABLE_AST
703 }
704 {
705 DISABLE_AST
706 queue_head = timer_queues[SPECIAL_QUEUE].queue_head;
707 auxp = queue_head;
708 prevp = auxp;
709 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
710 {
711 if(auxp->time_left <= 0)
712 {
713 dll_remove(auxp);
714 auxp->user_routine( auxp->tag );
715 free(auxp);
716 auxp = prevp;
717 n++;
718 if(n == 100)
719 {
720 ENABLE_AST
721 return(1);
722 }
723 }
724 else
725 prevp = auxp;
726 }
727 for( queue_id = curr_queue_id; queue_id < MAX_TIMER_QUEUES; queue_id++ )
728 {
729 if( (queue_head = timer_queues[queue_id].queue_head) == NULL )
730 continue;
731 Inside_ast = 1;
732 if((curr_entry) && (queue_id == curr_queue_id))
733 auxp = curr_entry;
734 else
735 auxp = queue_head;
736 while( (auxp = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)auxp)) )
737 {
738 if(auxp->time_left <= 0)
739 {
740 auxp->user_routine( auxp->tag );
741 auxp->time_left = auxp->time; /*restart clock*/
742 n++;
743 if(n == 100)
744 {
745 curr_queue_id = queue_id;
746 curr_entry = auxp;
747 ENABLE_AST
748 return(1);
749 }
750 }
751 }
752 Inside_ast = 0;
753 if( timer_queues[queue_id].remove_entries ) {
754 rem_deleted_entries( queue_id );
755 timer_queues[queue_id].remove_entries = 0;
756 }
757 }
758 curr_queue_id = 0;
759 curr_entry = 0;
760 ENABLE_AST
761 }
762 return(0);
763}
764
765static void alrm_sig_handler( int num)
766{
767 int next_time;
768
769 if(num){}
770 next_time = stop_it();
771 if(Threads_off)
772 {
773 if(scan_it())
774 next_time = -10;
775 }
776 else
777 {
778 while(scan_it());
779 }
780 if(!Alarm_runs)
781 {
782 start_it(next_time);
783 }
784}
785
786static void Std_timer_handler()
787{
788}
789
790void dtq_start_timer(int time, void (*user_routine)(), dim_long tag)
791{
792 extern void dim_init_threads();
793
794 if(!Threads_off)
795 {
796 dim_init_threads();
797 }
798 dim_dtq_init(0);
799 if(time != 0)
800 dtq_add_entry(SPECIAL_QUEUE, time, user_routine, tag);
801 else
802 dtq_add_entry(WRITE_QUEUE, time, user_routine, tag);
803}
804
805
806int dtq_stop_timer(dim_long tag)
807{
808 TIMR_ENT *entry, *queue_head;
809 int time_left = -1;
810
811 queue_head = timer_queues[SPECIAL_QUEUE].queue_head;
812 entry = queue_head;
813 while( (entry = (TIMR_ENT *)dll_get_next((DLL *)queue_head,(DLL *)entry)) )
814 {
815 if( entry->tag == tag )
816 {
817 time_left = dtq_rem_entry( SPECIAL_QUEUE, entry );
818 break;
819 }
820 }
821 return(time_left);
822}
823
824static int Dtq_sleeping = 0;
825
826void dtq_sleep_rout(dim_long tag)
827{
828 if(tag){}
829 Dtq_sleeping = 0;
830#ifdef WIN32
831 wake_up();
832#endif
833}
834
835#ifndef WIN32
836
837unsigned int dtq_sleep(int secs)
838{
839
840#ifndef NOTHREADS
841 int i;
842 for(i = 0; i < secs*2; i++)
843 {
844 dim_usleep(500000);
845 }
846 return(0);
847#else
848 sigset_t set, oset;
849
850 sigemptyset(&set);
851 sigaddset(&set,SIGALRM);
852 sigprocmask(SIG_UNBLOCK, &set, &oset);
853 Dtq_sleeping = 1;
854 dtq_start_timer(secs, dtq_sleep_rout, (dim_long)123);
855 do{
856 pause();
857 }while(Dtq_sleeping);
858 sigprocmask(SIG_SETMASK,&oset,0);
859 return(0);
860#endif
861}
862
863#else
864
865unsigned int dtq_sleep(int secs)
866{
867 Dtq_sleeping = 1;
868 dtq_start_timer(secs, dtq_sleep_rout, 1);
869 do{
870 dim_wait();
871 }while(Dtq_sleeping);
872 return(0);
873}
874
875#endif
Note: See TracBrowser for help on using the repository browser.