diff --git a/GSTExtensions.st b/GSTExtensions.st index 463fae5..0a6a3cd 100644 --- a/GSTExtensions.st +++ b/GSTExtensions.st @@ -82,3 +82,13 @@ Object extend [ "Compat for pharo. Use it to indicate deprecated functions" ] ] + +Duration extend [ + asMilliSeconds [ + ^self asMilliseconds + ] + + asDelay [ + ^Delay forMilliseconds: self asMilliseconds + ] +] diff --git a/Tests.st b/Tests.st index 35d2cb5..4e88ec0 100644 --- a/Tests.st +++ b/Tests.st @@ -32,14 +32,24 @@ TestCase subclass: DispatcherTest [ ] ] -TestCase subclass: TimerTest [ +TestCase subclass: TimerSchedulerTest [ + | timerScheduler | + + tearDown [ + timerScheduler doShutDown + ] + + setUp [ + timerScheduler := TimerScheduler new + ] + testTimer [ | sem now | now := DateTime now. sem := Semaphore new. - TimerScheduler instance scheduleInSeconds: 2 block: [ + timerScheduler scheduleInSeconds: 2 block: [ sem signal. ]. @@ -53,14 +63,13 @@ TestCase subclass: TimerTest [ block := [sem signal]. - fire1 := TimerScheduler instance scheduleInSeconds: 5 block: block. - timer1 := TimerScheduler instance scheduleInSeconds: 3 block: block. - timer2 := TimerScheduler instance scheduleInSeconds: 2 block: block. - + fire1 := timerScheduler scheduleInSeconds: 5 block: block. + timer1 := timerScheduler scheduleInSeconds: 3 block: block. + timer2 := timerScheduler scheduleInSeconds: 2 block: block. timer2 cancel. timer1 cancel. sem wait. - self assert: sem signals = 0 + self assert: sem signals equals: 0 ] ] diff --git a/Timer.st b/Timer.st index 909f77b..8d6adeb 100644 --- a/Timer.st +++ b/Timer.st @@ -65,10 +65,15 @@ Object subclass: Timer [ ^ schedule == nil. ] + + remainingTime [ + + ^timeout - DateTime now + ] ] Object subclass: TimerScheduler [ - | queue sem loop quit processExited | + | queue sem loop quit processExited delay | TimerScheduler class >> new [ - ^ super new + ^self basicNew initialize; addToBeFinalized; yourself ] + TimerScheduler class >> processName [ + + ^'Osmo Timers' + ] + + doStartUp [ + "Nothing for GST" + ] + + doShutDown [ + "Nothing for GST" + ] + + dispatchTimers [ + + OsmoDispatcher dispatchBlock: [self fireTimers: DateTime now] + ] + finalize [ quit := true. @@ -105,39 +128,70 @@ bit difficult to do this race free.'> startLoop [ - processExited := Semaphore new. - - loop := [[self runTimers] ensure: [ - processExited signal. - loop := nil]] newProcess. - loop name: 'Osmo Timers'. - loop resume. + loop := [[self runTimers] ensure: + [processExited signal. + loop := nil]] newProcess. + loop name: self class processName. + loop resume ] - scheduleInSeconds: aDelay block: aBlock [ - | sched | + signalDelay [ + "Called with sem critical being consumed" + delay ifNotNil: [delay signal]. + ] + + scheduleIn: aDuration block: aBlock [ - sched := (Timer on: self) + | timer currentFirst | + timer := (Timer on: self) block: aBlock; - timeout: (DateTime now + (Duration milliseconds: 1000 * aDelay)); + timeout: DateTime now + aDuration; yourself. sem critical: [ - queue add: sched. - ]. + currentFirst := queue isEmpty ifFalse: [queue first]. + queue add: timer. + loop isSuspended + ifTrue: [loop resume] + ifFalse: [ + "if the first item in the queue has changed we need to readjust the delay + to wait for. Signalling the waiting delay will enter the recalculation of a new + expire time" - ^ sched + currentFirst == queue first + ifFalse: [self signalDelay]]]. + ^timer + ] + + scheduleInSeconds: aNumber block: aBlock [ + + ^self scheduleIn: (Duration fromSeconds: aNumber) block: aBlock ] runTimers [ - + - [quit] whileFalse: [ | now | - (Delay forSeconds: 1) wait. - now := DateTime now. - OsmoDispatcher dispatchBlock: [self fireTimers: now]. - ] + [quit] whileFalse: [ + | timer | + sem critical: [queue isEmpty ifFalse: [timer := queue first]]. + timer isNil + ifTrue: [ + "nothing to do. No need to poll an empty queue. Remove delay to get rid of + a false resumptionTime. Suspend the process. The process will be resumed + when an item is added. Please note that Processor activeProcess == loop will + hold here." + + delay := nil. + loop suspend] + ifFalse: [ + "either a timer has expired and we process it or we wait for the first item in + the queue to expire" + + | offset | + (offset := timer remainingTime) asMilliSeconds > 0 + ifTrue: [(delay := offset asDelay) wait] + ifFalse: [self dispatchTimers]]] ] fireTimers: now [ diff --git a/changes_for_pharo.st b/changes_for_pharo.st index 614539f..3f43e61 100644 --- a/changes_for_pharo.st +++ b/changes_for_pharo.st @@ -19,8 +19,12 @@ TimerScheduler extend [ loop ifNil: [^self]. quit := true. + loop isSuspended ifTrue: [loop resume]. + delay ifNotNil: [:the_delay | the_delay signalWaitingProcess]. processExited wait. - Transcript nextPutAll: 'Stopped the TimerScheduler process'; cr. + Transcript + nextPutAll: 'Stopped the TimerScheduler process'; + cr ] doStartUp [ @@ -30,6 +34,16 @@ TimerScheduler extend [ quit := false. self startLoop. ] + + signalDelay [ + "Called with sem critical being consumed" + delay ifNotNil: [delay signalWaitingProcess]. + ] + + scheduleInSeconds: aNumber block: aBlock [ + + ^self scheduleIn: aNumber seconds block: aBlock + ] ] Dispatcher class extend [ diff --git a/package.xml b/package.xml index e2abb18..653d859 100644 --- a/package.xml +++ b/package.xml @@ -10,7 +10,7 @@ Osmo.DispatcherTest - Osmo.TimerTest + Osmo.TimerSchedulerTest Tests.st Osmo.StringFormatTest ExtensionTest.st