Improve timing behavior of ticking in loops
This commit is contained in:
@@ -34,6 +34,8 @@ type looper struct {
|
|||||||
start chan struct{}
|
start chan struct{}
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
updateTicker chan struct{}
|
updateTicker chan struct{}
|
||||||
|
timeNow func() time.Time
|
||||||
|
timeSince func(time.Time) time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLooper(conf Configurator, settings settings.DNS, logger logging.Logger,
|
func NewLooper(conf Configurator, settings settings.DNS, logger logging.Logger,
|
||||||
@@ -49,6 +51,8 @@ func NewLooper(conf Configurator, settings settings.DNS, logger logging.Logger,
|
|||||||
start: make(chan struct{}),
|
start: make(chan struct{}),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
updateTicker: make(chan struct{}),
|
updateTicker: make(chan struct{}),
|
||||||
|
timeNow: time.Now,
|
||||||
|
timeSince: time.Since,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -285,24 +289,45 @@ func (l *looper) useUnencryptedDNS(fallback bool) {
|
|||||||
|
|
||||||
func (l *looper) RunRestartTicker(ctx context.Context, wg *sync.WaitGroup) {
|
func (l *looper) RunRestartTicker(ctx context.Context, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ticker := time.NewTicker(time.Hour)
|
// Timer that acts as a ticker
|
||||||
|
timer := time.NewTimer(time.Hour)
|
||||||
|
timer.Stop()
|
||||||
|
timerIsStopped := true
|
||||||
settings := l.GetSettings()
|
settings := l.GetSettings()
|
||||||
if settings.UpdatePeriod > 0 {
|
if settings.UpdatePeriod > 0 {
|
||||||
ticker = time.NewTicker(settings.UpdatePeriod)
|
timer.Reset(settings.UpdatePeriod)
|
||||||
} else {
|
timerIsStopped = false
|
||||||
ticker.Stop()
|
|
||||||
}
|
}
|
||||||
|
lastTick := time.Unix(0, 0)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
ticker.Stop()
|
if !timerIsStopped && !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-timer.C:
|
||||||
|
lastTick = l.timeNow()
|
||||||
l.restart <- struct{}{}
|
l.restart <- struct{}{}
|
||||||
|
settings := l.GetSettings()
|
||||||
|
timer.Reset(settings.UpdatePeriod)
|
||||||
case <-l.updateTicker:
|
case <-l.updateTicker:
|
||||||
ticker.Stop()
|
if !timer.Stop() {
|
||||||
period := l.GetSettings().UpdatePeriod
|
<-timer.C
|
||||||
ticker = time.NewTicker(period)
|
}
|
||||||
|
timerIsStopped = true
|
||||||
|
settings := l.GetSettings()
|
||||||
|
newUpdatePeriod := settings.UpdatePeriod
|
||||||
|
if newUpdatePeriod == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var waited time.Duration
|
||||||
|
if lastTick.UnixNano() != 0 {
|
||||||
|
waited = l.timeSince(lastTick)
|
||||||
|
}
|
||||||
|
leftToWait := newUpdatePeriod - waited
|
||||||
|
timer.Reset(leftToWait)
|
||||||
|
timerIsStopped = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -151,11 +151,9 @@ func (p *piaV4) PortForward(ctx context.Context, client *http.Client,
|
|||||||
}
|
}
|
||||||
|
|
||||||
expiryTimer := time.NewTimer(durationToExpiration)
|
expiryTimer := time.NewTimer(durationToExpiration)
|
||||||
defer expiryTimer.Stop()
|
|
||||||
const keepAlivePeriod = 15 * time.Minute
|
const keepAlivePeriod = 15 * time.Minute
|
||||||
keepAliveTicker := time.NewTicker(keepAlivePeriod)
|
// Timer behaving as a ticker
|
||||||
defer keepAliveTicker.Stop()
|
keepAliveTimer := time.NewTimer(keepAlivePeriod)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -164,11 +162,18 @@ func (p *piaV4) PortForward(ctx context.Context, client *http.Client,
|
|||||||
if err := fw.RemoveAllowedPort(removeCtx, data.Port); err != nil {
|
if err := fw.RemoveAllowedPort(removeCtx, data.Port); err != nil {
|
||||||
pfLogger.Error(err)
|
pfLogger.Error(err)
|
||||||
}
|
}
|
||||||
|
if !keepAliveTimer.Stop() {
|
||||||
|
<-keepAliveTimer.C
|
||||||
|
}
|
||||||
|
if !expiryTimer.Stop() {
|
||||||
|
<-expiryTimer.C
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case <-keepAliveTicker.C:
|
case <-keepAliveTimer.C:
|
||||||
if err := bindPIAPort(client, gateway, data); err != nil {
|
if err := bindPIAPort(client, gateway, data); err != nil {
|
||||||
pfLogger.Error(err)
|
pfLogger.Error(err)
|
||||||
}
|
}
|
||||||
|
keepAliveTimer.Reset(keepAlivePeriod)
|
||||||
case <-expiryTimer.C:
|
case <-expiryTimer.C:
|
||||||
pfLogger.Warn("Forward port has expired on %s, getting another one", data.Expiration.Format(time.RFC1123))
|
pfLogger.Warn("Forward port has expired on %s, getting another one", data.Expiration.Format(time.RFC1123))
|
||||||
oldPort := data.Port
|
oldPort := data.Port
|
||||||
@@ -199,7 +204,10 @@ func (p *piaV4) PortForward(ctx context.Context, client *http.Client,
|
|||||||
if err := bindPIAPort(client, gateway, data); err != nil {
|
if err := bindPIAPort(client, gateway, data); err != nil {
|
||||||
pfLogger.Error(err)
|
pfLogger.Error(err)
|
||||||
}
|
}
|
||||||
keepAliveTicker.Reset(keepAlivePeriod)
|
if !keepAliveTimer.Stop() {
|
||||||
|
<-keepAliveTimer.C
|
||||||
|
}
|
||||||
|
keepAliveTimer.Reset(keepAlivePeriod)
|
||||||
expiryTimer.Reset(durationToExpiration)
|
expiryTimer.Reset(durationToExpiration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ type looper struct {
|
|||||||
restart chan struct{}
|
restart chan struct{}
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
updateTicker chan struct{}
|
updateTicker chan struct{}
|
||||||
|
timeNow func() time.Time
|
||||||
|
timeSince func(time.Time) time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLooper(client network.Client, logger logging.Logger, fileManager files.FileManager,
|
func NewLooper(client network.Client, logger logging.Logger, fileManager files.FileManager,
|
||||||
@@ -47,6 +49,8 @@ func NewLooper(client network.Client, logger logging.Logger, fileManager files.F
|
|||||||
restart: make(chan struct{}),
|
restart: make(chan struct{}),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
updateTicker: make(chan struct{}),
|
updateTicker: make(chan struct{}),
|
||||||
|
timeNow: time.Now,
|
||||||
|
timeSince: time.Since,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,23 +131,42 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
func (l *looper) RunRestartTicker(ctx context.Context, wg *sync.WaitGroup) {
|
func (l *looper) RunRestartTicker(ctx context.Context, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ticker := time.NewTicker(time.Hour)
|
timer := time.NewTimer(time.Hour)
|
||||||
|
timer.Stop() // 1 hour, cannot be a race condition
|
||||||
|
timerIsStopped := true
|
||||||
period := l.GetPeriod()
|
period := l.GetPeriod()
|
||||||
if period > 0 {
|
if period > 0 {
|
||||||
ticker = time.NewTicker(period)
|
timer.Reset(period)
|
||||||
} else {
|
timerIsStopped = false
|
||||||
ticker.Stop()
|
|
||||||
}
|
}
|
||||||
|
lastTick := time.Unix(0, 0)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
ticker.Stop()
|
if !timerIsStopped && !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-timer.C:
|
||||||
|
lastTick = l.timeNow()
|
||||||
l.restart <- struct{}{}
|
l.restart <- struct{}{}
|
||||||
|
timer.Reset(l.GetPeriod())
|
||||||
case <-l.updateTicker:
|
case <-l.updateTicker:
|
||||||
ticker.Stop()
|
if !timer.Stop() {
|
||||||
ticker = time.NewTicker(l.GetPeriod())
|
<-timer.C
|
||||||
|
}
|
||||||
|
timerIsStopped = true
|
||||||
|
period := l.GetPeriod()
|
||||||
|
if period == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var waited time.Duration
|
||||||
|
if lastTick.UnixNano() > 0 {
|
||||||
|
waited = l.timeSince(lastTick)
|
||||||
|
}
|
||||||
|
leftToWait := period - waited
|
||||||
|
timer.Reset(leftToWait)
|
||||||
|
timerIsStopped = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ type looper struct {
|
|||||||
restart chan struct{}
|
restart chan struct{}
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
updateTicker chan struct{}
|
updateTicker chan struct{}
|
||||||
|
timeNow func() time.Time
|
||||||
|
timeSince func(time.Time) time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLooper(options Options, period time.Duration, currentServers models.AllServers,
|
func NewLooper(options Options, period time.Duration, currentServers models.AllServers,
|
||||||
@@ -45,6 +47,8 @@ func NewLooper(options Options, period time.Duration, currentServers models.AllS
|
|||||||
restart: make(chan struct{}),
|
restart: make(chan struct{}),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
updateTicker: make(chan struct{}),
|
updateTicker: make(chan struct{}),
|
||||||
|
timeNow: time.Now,
|
||||||
|
timeSince: time.Since,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,23 +129,41 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
func (l *looper) RunRestartTicker(ctx context.Context, wg *sync.WaitGroup) {
|
func (l *looper) RunRestartTicker(ctx context.Context, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ticker := time.NewTicker(time.Hour)
|
timer := time.NewTimer(time.Hour)
|
||||||
period := l.GetPeriod()
|
timer.Stop()
|
||||||
if period > 0 {
|
timerIsStopped := true
|
||||||
ticker = time.NewTicker(period)
|
if period := l.GetPeriod(); period > 0 {
|
||||||
} else {
|
timerIsStopped = false
|
||||||
ticker.Stop()
|
timer.Reset(period)
|
||||||
}
|
}
|
||||||
|
lastTick := time.Unix(0, 0)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
ticker.Stop()
|
if !timerIsStopped && !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-timer.C:
|
||||||
|
lastTick = l.timeNow()
|
||||||
l.restart <- struct{}{}
|
l.restart <- struct{}{}
|
||||||
|
timer.Reset(l.GetPeriod())
|
||||||
case <-l.updateTicker:
|
case <-l.updateTicker:
|
||||||
ticker.Stop()
|
if !timerIsStopped && !timer.Stop() {
|
||||||
ticker = time.NewTicker(l.GetPeriod())
|
<-timer.C
|
||||||
|
}
|
||||||
|
timerIsStopped = true
|
||||||
|
period := l.GetPeriod()
|
||||||
|
if period == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var waited time.Duration
|
||||||
|
if lastTick.UnixNano() > 0 {
|
||||||
|
waited = l.timeSince(lastTick)
|
||||||
|
}
|
||||||
|
leftToWait := period - waited
|
||||||
|
timer.Reset(leftToWait)
|
||||||
|
timerIsStopped = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user