fix(publicip): rework run loop and fix restarts

- Clearing IP data on VPN disconnection clears file
- More efficient partial updates
- Fix loop exit
- Validate settings before updating
This commit is contained in:
Quentin McGaw
2023-09-24 14:55:51 +00:00
parent e64e5af4c3
commit f96448947f
21 changed files with 308 additions and 380 deletions

View File

@@ -377,7 +377,10 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
portForwardLogger := logger.New(log.SetComponent("port forwarding"))
portForwardLooper := portforward.NewLoop(allSettings.VPN.Provider.PortForwarding,
routingConf, httpClient, firewallConf, portForwardLogger, puid, pgid)
portForwardRunError, _ := portForwardLooper.Start(context.Background())
portForwardRunError, err := portForwardLooper.Start(ctx)
if err != nil {
return fmt.Errorf("starting port forwarding loop: %w", err)
}
unboundLogger := logger.New(log.SetComponent("dns"))
unboundLooper := dns.NewLoop(dnsConf, allSettings.DNS, httpClient,
@@ -397,15 +400,10 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
publicIPLooper := publicip.NewLoop(ipFetcher,
logger.New(log.SetComponent("ip getter")),
allSettings.PublicIP, puid, pgid)
pubIPHandler, pubIPCtx, pubIPDone := goshutdown.NewGoRoutineHandler(
"public IP", goroutine.OptionTimeout(defaultShutdownTimeout))
go publicIPLooper.Run(pubIPCtx, pubIPDone)
otherGroupHandler.Add(pubIPHandler)
pubIPTickerHandler, pubIPTickerCtx, pubIPTickerDone := goshutdown.NewGoRoutineHandler(
"public IP", goroutine.OptionTimeout(defaultShutdownTimeout))
go publicIPLooper.RunRestartTicker(pubIPTickerCtx, pubIPTickerDone)
tickersGroupHandler.Add(pubIPTickerHandler)
publicIPRunError, err := publicIPLooper.Start(ctx)
if err != nil {
return fmt.Errorf("starting public ip loop: %w", err)
}
updaterLogger := logger.New(log.SetComponent("updater"))
@@ -487,12 +485,22 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
select {
case <-ctx.Done():
err = portForwardLooper.Stop()
if err != nil {
logger.Error("stopping port forward loop: " + err.Error())
stoppers := []interface {
String() string
Stop() error
}{
portForwardLooper, publicIPLooper,
}
for _, stopper := range stoppers {
err := stopper.Stop()
if err != nil {
logger.Error(fmt.Sprintf("stopping %s: %s", stopper, err))
}
}
case err := <-portForwardRunError:
logger.Errorf("port forwarding loop crashed: %s", err)
case err := <-publicIPRunError:
logger.Errorf("public IP loop crashed: %s", err)
}
return orderHandler.Shutdown(context.Background())

View File

@@ -23,6 +23,20 @@ type PublicIP struct {
IPFilepath *string
}
// UpdateWith deep copies the receiving settings, overrides the copy with
// fields set in the partialUpdate argument, validates the new settings
// and returns them if they are valid, or returns an error otherwise.
// In all cases, the receiving settings are unmodified.
func (p PublicIP) UpdateWith(partialUpdate PublicIP) (updatedSettings PublicIP, err error) {
updatedSettings = p.copy()
updatedSettings.overrideWith(partialUpdate)
err = updatedSettings.validate()
if err != nil {
return updatedSettings, fmt.Errorf("validating updated settings: %w", err)
}
return updatedSettings, nil
}
func (p PublicIP) validate() (err error) {
const minPeriod = 5 * time.Second
if *p.Period < minPeriod {

View File

@@ -48,6 +48,10 @@ func NewLoop(settings settings.PortForwarding, routing Routing,
}
}
func (l *Loop) String() string {
return "port forwarding loop"
}
func (l *Loop) Start(_ context.Context) (runError <-chan error, _ error) {
l.runCtx, l.runCancel = context.WithCancel(context.Background())
runDone := make(chan struct{})

24
internal/publicip/data.go Normal file
View File

@@ -0,0 +1,24 @@
package publicip
import "github.com/qdm12/gluetun/internal/models"
// GetData returns the public IP data obtained from the last
// fetch. It is notably used by the HTTP control server.
func (l *Loop) GetData() (data models.PublicIP) {
l.ipDataMutex.RLock()
defer l.ipDataMutex.RUnlock()
return l.ipData
}
// ClearData is used when the VPN connection goes down
// and the public IP is not known anymore.
func (l *Loop) ClearData() (err error) {
l.ipDataMutex.Lock()
defer l.ipDataMutex.Unlock()
l.ipData = models.PublicIP{}
l.settingsMutex.RLock()
filepath := *l.settings.IPFilepath
l.settingsMutex.RUnlock()
return persistPublicIP(filepath, "", l.puid, l.pgid)
}

View File

@@ -1,7 +0,0 @@
package publicip
import "errors"
var (
ErrBadStatusCode = errors.New("bad HTTP status")
)

View File

@@ -1,22 +0,0 @@
package publicip
import (
"context"
"time"
)
func (l *Loop) logAndWait(ctx context.Context, err error) {
if err != nil {
l.logger.Error(err.Error())
}
l.logger.Info("retrying in " + l.backoffTime.String())
timer := time.NewTimer(l.backoffTime)
l.backoffTime *= 2
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
}
}

View File

@@ -11,3 +11,9 @@ type Fetcher interface {
FetchInfo(ctx context.Context, ip netip.Addr) (
result ipinfo.Response, err error)
}
type Logger interface {
Info(s string)
Warn(s string)
Error(s string)
}

View File

@@ -1,7 +0,0 @@
package publicip
type Logger interface {
Info(s string)
Warn(s string)
Error(s string)
}

View File

@@ -1,64 +1,172 @@
package publicip
import (
"context"
"fmt"
"net/netip"
"sync"
"time"
"github.com/qdm12/gluetun/internal/configuration/settings"
"github.com/qdm12/gluetun/internal/constants"
"github.com/qdm12/gluetun/internal/loopstate"
"github.com/qdm12/gluetun/internal/models"
"github.com/qdm12/gluetun/internal/publicip/state"
"github.com/qdm12/gluetun/internal/publicip/ipinfo"
)
type Loop struct {
statusManager *loopstate.State
state *state.State
// Objects
// State
settings settings.PublicIP
settingsMutex sync.RWMutex
ipData models.PublicIP
ipDataMutex sync.RWMutex
// Fixed injected objets
fetcher Fetcher
logger Logger
// Fixed settings
// Fixed parameters
puid int
pgid int
// Internal channels and locks
start chan struct{}
running chan models.LoopStatus
stop chan struct{}
stopped chan struct{}
updateTicker chan struct{}
backoffTime time.Duration
userTrigger bool
// runCtx is used to detect when the loop has exited
// when performing an update
runCtx context.Context //nolint:containedctx
runCancel context.CancelFunc
runTrigger chan<- struct{}
updateTrigger chan<- settings.PublicIP
updatedResult <-chan error
runDone <-chan struct{}
// Mock functions
timeNow func() time.Time
}
const defaultBackoffTime = 5 * time.Second
func NewLoop(fetcher Fetcher, logger Logger,
settings settings.PublicIP, puid, pgid int) *Loop {
start := make(chan struct{})
running := make(chan models.LoopStatus)
stop := make(chan struct{})
stopped := make(chan struct{})
updateTicker := make(chan struct{})
statusManager := loopstate.New(constants.Stopped, start, running, stop, stopped)
state := state.New(statusManager, settings, updateTicker)
return &Loop{
statusManager: statusManager,
state: state,
// Objects
fetcher: fetcher,
logger: logger,
puid: puid,
pgid: pgid,
start: start,
running: running,
stop: stop,
stopped: stopped,
updateTicker: updateTicker,
userTrigger: true,
backoffTime: defaultBackoffTime,
timeNow: time.Now,
settings: settings,
fetcher: fetcher,
logger: logger,
puid: puid,
pgid: pgid,
timeNow: time.Now,
}
}
func (l *Loop) String() string {
return "public ip loop"
}
func (l *Loop) Start(_ context.Context) (_ <-chan error, err error) {
l.runCtx, l.runCancel = context.WithCancel(context.Background())
runDone := make(chan struct{})
l.runDone = runDone
runTrigger := make(chan struct{})
l.runTrigger = runTrigger
updateTrigger := make(chan settings.PublicIP)
l.updateTrigger = updateTrigger
updatedResult := make(chan error)
l.updatedResult = updatedResult
go l.run(l.runCtx, runDone, runTrigger, updateTrigger, updatedResult)
return nil, nil //nolint:nilnil
}
func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
runTrigger <-chan struct{}, updateTrigger <-chan settings.PublicIP,
updatedResult chan<- error) {
defer close(runDone)
timer := time.NewTimer(time.Hour)
defer timer.Stop()
_ = timer.Stop()
timerIsReadyToReset := true
lastFetch := time.Unix(0, 0)
for {
select {
case <-runCtx.Done():
return
case <-runTrigger:
case <-timer.C:
timerIsReadyToReset = true
case partialUpdate := <-updateTrigger:
var err error
timerIsReadyToReset, err = l.update(partialUpdate, lastFetch, timer, timerIsReadyToReset)
updatedResult <- err
continue
}
result, exit := l.fetchIPData(runCtx)
if exit {
return
}
message := "Public IP address is " + result.IP.String()
message += " (" + result.Country + ", " + result.Region + ", " + result.City + ")"
l.logger.Info(message)
l.ipDataMutex.Lock()
l.ipData = result.ToPublicIPModel()
l.ipDataMutex.Unlock()
filepath := *l.settings.IPFilepath
err := persistPublicIP(filepath, result.IP.String(), l.puid, l.pgid)
if err != nil { // non critical error, which can be fixed with settings updates.
l.logger.Error(err.Error())
}
lastFetch = l.timeNow()
timerIsReadyToReset = l.updateTimer(*l.settings.Period, lastFetch, timer, timerIsReadyToReset)
}
}
func (l *Loop) fetchIPData(ctx context.Context) (result ipinfo.Response, exit bool) {
// keep retrying since settings updates won't change the
// behavior of the following code.
const defaultBackoffTime = 5 * time.Second
backoffTime := defaultBackoffTime
for {
var err error
result, err = l.fetcher.FetchInfo(ctx, netip.Addr{})
if err == nil {
return result, false
}
exit = ctx.Err() != nil
if exit {
return result, true
}
l.logger.Error(fmt.Sprintf("%s - retrying in %s", err, backoffTime))
select {
case <-ctx.Done():
return result, true
case <-time.After(backoffTime):
}
const backoffTimeMultipler = 2
backoffTime *= backoffTimeMultipler
}
}
func (l *Loop) StartSingleRun() {
l.runTrigger <- struct{}{}
}
func (l *Loop) UpdateWith(partialUpdate settings.PublicIP) (err error) {
select {
case l.updateTrigger <- partialUpdate:
select {
case err = <-l.updatedResult:
return err
case <-l.runCtx.Done():
return l.runCtx.Err()
}
case <-l.runCtx.Done():
// loop has been stopped, no update can be done
return l.runCtx.Err()
}
}
func (l *Loop) Stop() (err error) {
l.runCancel()
<-l.runDone
return l.ClearData()
}

View File

@@ -1,13 +0,0 @@
package publicip
import (
"github.com/qdm12/gluetun/internal/models"
)
func (l *Loop) GetData() (data models.PublicIP) {
return l.state.GetData()
}
func (l *Loop) SetData(data models.PublicIP) {
l.state.SetData(data)
}

View File

@@ -1,100 +0,0 @@
package publicip
import (
"context"
"errors"
"net/netip"
"os"
"github.com/qdm12/gluetun/internal/constants"
"github.com/qdm12/gluetun/internal/models"
"github.com/qdm12/gluetun/internal/publicip/ipinfo"
)
func (l *Loop) Run(ctx context.Context, done chan<- struct{}) {
defer close(done)
select {
case <-l.start:
case <-ctx.Done():
return
}
for ctx.Err() == nil {
getCtx, getCancel := context.WithCancel(ctx)
defer getCancel()
resultCh := make(chan models.PublicIP)
errorCh := make(chan error)
go func() {
result, err := l.fetcher.FetchInfo(getCtx, netip.Addr{})
if err != nil {
if getCtx.Err() == nil {
errorCh <- err
}
return
}
resultCh <- result.ToPublicIPModel()
}()
if l.userTrigger {
l.userTrigger = false
l.running <- constants.Running
} else { // crash
l.backoffTime = defaultBackoffTime
l.statusManager.SetStatus(constants.Running)
}
stayHere := true
for stayHere {
select {
case <-ctx.Done():
getCancel()
close(errorCh)
filepath := *l.state.GetSettings().IPFilepath
l.logger.Info("Removing ip file " + filepath)
if err := os.Remove(filepath); err != nil {
l.logger.Error(err.Error())
}
return
case <-l.start:
l.userTrigger = true
getCancel()
stayHere = false
case <-l.stop:
l.userTrigger = true
l.logger.Info("stopping")
getCancel()
<-errorCh
l.stopped <- struct{}{}
case result := <-resultCh:
getCancel()
message := "Public IP address is " + result.IP.String()
message += " (" + result.Country + ", " + result.Region + ", " + result.City + ")"
l.logger.Info(message)
l.state.SetData(result)
filepath := *l.state.GetSettings().IPFilepath
err := persistPublicIP(filepath, result.IP.String(), l.puid, l.pgid)
if err != nil {
l.logger.Error(err.Error())
}
l.statusManager.SetStatus(constants.Completed)
case err := <-errorCh:
if errors.Is(err, ipinfo.ErrTooManyRequests) {
l.logger.Warn(err.Error())
l.statusManager.SetStatus(constants.Crashed)
break
}
getCancel()
close(resultCh)
l.statusManager.SetStatus(constants.Crashed)
l.logAndWait(ctx, err)
stayHere = false
}
}
close(errorCh)
}
}

View File

@@ -1,16 +0,0 @@
package publicip
import (
"context"
"github.com/qdm12/gluetun/internal/configuration/settings"
)
func (l *Loop) GetSettings() (settings settings.PublicIP) {
return l.state.GetSettings()
}
func (l *Loop) SetSettings(ctx context.Context, settings settings.PublicIP) (
outcome string) {
return l.state.SetSettings(ctx, settings)
}

View File

@@ -1,17 +0,0 @@
package state
import (
"github.com/qdm12/gluetun/internal/models"
)
func (s *State) GetData() (data models.PublicIP) {
s.ipDataMu.RLock()
defer s.ipDataMu.RUnlock()
return s.ipData.Copy()
}
func (s *State) SetData(data models.PublicIP) {
s.ipDataMu.Lock()
defer s.ipDataMu.Unlock()
s.ipData = data.Copy()
}

View File

@@ -1,35 +0,0 @@
package state
import (
"context"
"reflect"
"github.com/qdm12/gluetun/internal/configuration/settings"
)
func (s *State) GetSettings() (settings settings.PublicIP) {
s.settingsMu.RLock()
defer s.settingsMu.RUnlock()
return s.settings
}
func (s *State) SetSettings(_ context.Context, settings settings.PublicIP) (
outcome string) {
s.settingsMu.Lock()
settingsUnchanged := reflect.DeepEqual(s.settings, settings)
if settingsUnchanged {
s.settingsMu.Unlock()
return "settings left unchanged"
}
periodChanged := s.settings.Period != settings.Period
s.settings = settings
s.settingsMu.Unlock()
if periodChanged {
s.updateTicker <- struct{}{}
// TODO blocking
}
return "settings updated"
}

View File

@@ -1,36 +0,0 @@
package state
import (
"context"
"sync"
"github.com/qdm12/gluetun/internal/configuration/settings"
"github.com/qdm12/gluetun/internal/models"
)
func New(statusApplier StatusApplier,
settings settings.PublicIP,
updateTicker chan<- struct{}) *State {
return &State{
statusApplier: statusApplier,
settings: settings,
updateTicker: updateTicker,
}
}
type State struct {
statusApplier StatusApplier
settings settings.PublicIP
settingsMu sync.RWMutex
ipData models.PublicIP
ipDataMu sync.RWMutex
updateTicker chan<- struct{}
}
type StatusApplier interface {
ApplyStatus(ctx context.Context, status models.LoopStatus) (
outcome string, err error)
}

View File

@@ -1,16 +0,0 @@
package publicip
import (
"context"
"github.com/qdm12/gluetun/internal/models"
)
func (l *Loop) GetStatus() (status models.LoopStatus) {
return l.statusManager.GetStatus()
}
func (l *Loop) ApplyStatus(ctx context.Context, status models.LoopStatus) (
outcome string, err error) {
return l.statusManager.ApplyStatus(ctx, status)
}

View File

@@ -1,49 +0,0 @@
package publicip
import (
"context"
"time"
"github.com/qdm12/gluetun/internal/constants"
)
func (l *Loop) RunRestartTicker(ctx context.Context, done chan<- struct{}) {
defer close(done)
timer := time.NewTimer(time.Hour)
timer.Stop() // 1 hour, cannot be a race condition
timerIsStopped := true
if period := *l.state.GetSettings().Period; period > 0 {
timerIsStopped = false
timer.Reset(period)
}
lastTick := time.Unix(0, 0)
for {
select {
case <-ctx.Done():
if !timerIsStopped && !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
lastTick = l.timeNow()
_, _ = l.statusManager.ApplyStatus(ctx, constants.Running)
timer.Reset(*l.state.GetSettings().Period)
case <-l.updateTicker:
if !timerIsStopped && !timer.Stop() {
<-timer.C
}
timerIsStopped = true
period := *l.state.GetSettings().Period
if period == 0 {
continue
}
var waited time.Duration
if lastTick.UnixNano() > 0 {
waited = l.timeNow().Sub(lastTick)
}
leftToWait := period - waited
timer.Reset(leftToWait)
timerIsStopped = false
}
}
}

View File

@@ -0,0 +1,82 @@
package publicip
import (
"fmt"
"os"
"time"
"github.com/qdm12/gluetun/internal/configuration/settings"
)
func (l *Loop) update(partialUpdate settings.PublicIP,
lastTick time.Time, timer *time.Timer, timerIsReadyToReset bool) (
newTimerIsReadyToReset bool, err error) {
newTimerIsReadyToReset = timerIsReadyToReset
// No need to lock the mutex since it can only be written
// in the code below in this goroutine.
updatedSettings, err := l.settings.UpdateWith(partialUpdate)
if err != nil {
return newTimerIsReadyToReset, err
}
if *l.settings.Period != *updatedSettings.Period {
newTimerIsReadyToReset = l.updateTimer(*updatedSettings.Period, lastTick,
timer, timerIsReadyToReset)
}
if *l.settings.IPFilepath != *updatedSettings.IPFilepath {
switch {
case *l.settings.IPFilepath == "":
err = persistPublicIP(*updatedSettings.IPFilepath,
l.ipData.IP.String(), l.puid, l.pgid)
if err != nil {
return newTimerIsReadyToReset, fmt.Errorf("persisting ip data: %w", err)
}
case *updatedSettings.IPFilepath == "":
err = os.Remove(*l.settings.IPFilepath)
if err != nil {
return newTimerIsReadyToReset, fmt.Errorf("removing ip data file path: %w", err)
}
default:
err = os.Rename(*l.settings.IPFilepath, *updatedSettings.IPFilepath)
if err != nil {
return newTimerIsReadyToReset, fmt.Errorf("renaming ip data file path: %w", err)
}
}
}
l.settingsMutex.Lock()
l.settings = updatedSettings
l.settingsMutex.Unlock()
return newTimerIsReadyToReset, nil
}
func (l *Loop) updateTimer(period time.Duration, lastFetch time.Time,
timer *time.Timer, timerIsReadyToReset bool) (newTimerIsReadyToReset bool) {
disableTimer := period == 0
if disableTimer {
if !timer.Stop() {
<-timer.C
}
return true
}
if !timerIsReadyToReset {
if !timer.Stop() {
<-timer.C
}
}
var waited time.Duration
if lastFetch.UnixNano() > 0 {
waited = l.timeNow().Sub(lastFetch)
}
leftToWait := period - waited
if leftToWait <= 0 {
leftToWait = time.Nanosecond
}
timer.Reset(leftToWait)
return false
}

View File

@@ -3,8 +3,6 @@ package vpn
import (
"context"
"errors"
"github.com/qdm12/gluetun/internal/models"
)
func (l *Loop) cleanup(vpnProvider string) {
@@ -15,9 +13,12 @@ func (l *Loop) cleanup(vpnProvider string) {
}
}
l.publicip.SetData(models.PublicIP{}) // clear public IP address data
err := l.publicip.ClearData()
if err != nil {
l.logger.Error("clearing public IP data: " + err.Error())
}
err := l.stopPortForwarding(vpnProvider)
err = l.stopPortForwarding(vpnProvider)
if err != nil {
portForwardingAlreadyStopped := errors.Is(err, context.Canceled)
if !portForwardingAlreadyStopped {

View File

@@ -90,7 +90,6 @@ type DNSLoop interface {
}
type PublicIPLoop interface {
ApplyStatus(ctx context.Context, status models.LoopStatus) (
outcome string, err error)
SetData(data models.PublicIP)
StartSingleRun()
ClearData() (err error)
}

View File

@@ -28,8 +28,8 @@ func (l *Loop) onTunnelUp(ctx context.Context, data tunnelUpData) {
_, _ = l.dnsLooper.ApplyStatus(ctx, constants.Running)
}
// Runs the Public IP getter job once
_, _ = l.publicip.ApplyStatus(ctx, constants.Running)
l.publicip.StartSingleRun()
if l.versionInfo {
l.versionInfo = false // only get the version information once
message, err := version.GetMessage(ctx, l.buildInfo, l.client)