fix(publicip): abort ip data fetch if vpn context is canceled
- Prevents requesting the public IP address N times after N VPN failures - Fetching runs with a context local to the 'single run' - Single run writes single run result to a channel back to the caller, RunOnce is now blocking
This commit is contained in:
@@ -2,7 +2,6 @@ package publicip
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -10,7 +9,6 @@ import (
|
|||||||
|
|
||||||
"github.com/qdm12/gluetun/internal/configuration/settings"
|
"github.com/qdm12/gluetun/internal/configuration/settings"
|
||||||
"github.com/qdm12/gluetun/internal/models"
|
"github.com/qdm12/gluetun/internal/models"
|
||||||
"github.com/qdm12/gluetun/internal/publicip/api"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Loop struct {
|
type Loop struct {
|
||||||
@@ -30,7 +28,8 @@ type Loop struct {
|
|||||||
// when performing an update
|
// when performing an update
|
||||||
runCtx context.Context //nolint:containedctx
|
runCtx context.Context //nolint:containedctx
|
||||||
runCancel context.CancelFunc
|
runCancel context.CancelFunc
|
||||||
runTrigger chan<- struct{}
|
runTrigger chan<- context.Context
|
||||||
|
runResult <-chan error
|
||||||
updateTrigger chan<- settings.PublicIP
|
updateTrigger chan<- settings.PublicIP
|
||||||
updatedResult <-chan error
|
updatedResult <-chan error
|
||||||
runDone <-chan struct{}
|
runDone <-chan struct{}
|
||||||
@@ -58,21 +57,23 @@ func (l *Loop) Start(_ context.Context) (_ <-chan error, err error) {
|
|||||||
l.runCtx, l.runCancel = context.WithCancel(context.Background())
|
l.runCtx, l.runCancel = context.WithCancel(context.Background())
|
||||||
runDone := make(chan struct{})
|
runDone := make(chan struct{})
|
||||||
l.runDone = runDone
|
l.runDone = runDone
|
||||||
runTrigger := make(chan struct{})
|
runTrigger := make(chan context.Context)
|
||||||
l.runTrigger = runTrigger
|
l.runTrigger = runTrigger
|
||||||
|
runResult := make(chan error)
|
||||||
|
l.runResult = runResult
|
||||||
updateTrigger := make(chan settings.PublicIP)
|
updateTrigger := make(chan settings.PublicIP)
|
||||||
l.updateTrigger = updateTrigger
|
l.updateTrigger = updateTrigger
|
||||||
updatedResult := make(chan error)
|
updatedResult := make(chan error)
|
||||||
l.updatedResult = updatedResult
|
l.updatedResult = updatedResult
|
||||||
|
|
||||||
go l.run(l.runCtx, runDone, runTrigger, updateTrigger, updatedResult)
|
go l.run(l.runCtx, runDone, runTrigger, runResult, updateTrigger, updatedResult)
|
||||||
|
|
||||||
return nil, nil //nolint:nilnil
|
return nil, nil //nolint:nilnil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
|
func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
|
||||||
runTrigger <-chan struct{}, updateTrigger <-chan settings.PublicIP,
|
runTrigger <-chan context.Context, runResult chan<- error,
|
||||||
updatedResult chan<- error) {
|
updateTrigger <-chan settings.PublicIP, updatedResult chan<- error) {
|
||||||
defer close(runDone)
|
defer close(runDone)
|
||||||
|
|
||||||
timer := time.NewTimer(time.Hour)
|
timer := time.NewTimer(time.Hour)
|
||||||
@@ -82,10 +83,14 @@ func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
|
|||||||
lastFetch := time.Unix(0, 0)
|
lastFetch := time.Unix(0, 0)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
singleRunCtx := runCtx
|
||||||
|
var singleRunResult chan<- error
|
||||||
select {
|
select {
|
||||||
case <-runCtx.Done():
|
case <-runCtx.Done():
|
||||||
return
|
return
|
||||||
case <-runTrigger:
|
case singleRunCtx = <-runTrigger:
|
||||||
|
// Note singleRunCtx is canceled if runCtx is canceled.
|
||||||
|
singleRunResult = runResult
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
timerIsReadyToReset = true
|
timerIsReadyToReset = true
|
||||||
case partialUpdate := <-updateTrigger:
|
case partialUpdate := <-updateTrigger:
|
||||||
@@ -95,15 +100,17 @@ func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := l.fetchIPData(runCtx)
|
|
||||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
lastFetch = l.timeNow()
|
lastFetch = l.timeNow()
|
||||||
timerIsReadyToReset = l.updateTimer(*l.settings.Period, lastFetch, timer, timerIsReadyToReset)
|
timerIsReadyToReset = l.updateTimer(*l.settings.Period, lastFetch, timer, timerIsReadyToReset)
|
||||||
|
|
||||||
if errors.Is(err, api.ErrTooManyRequests) {
|
result, err := l.fetcher.FetchInfo(singleRunCtx, netip.Addr{})
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("fetching information: %w", err)
|
||||||
|
if singleRunResult != nil {
|
||||||
|
singleRunResult <- err
|
||||||
|
} else {
|
||||||
|
l.logger.Error(err.Error())
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,42 +124,36 @@ func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
|
|||||||
|
|
||||||
filepath := *l.settings.IPFilepath
|
filepath := *l.settings.IPFilepath
|
||||||
err = persistPublicIP(filepath, result.IP.String(), l.puid, l.pgid)
|
err = persistPublicIP(filepath, result.IP.String(), l.puid, l.pgid)
|
||||||
if err != nil { // non critical error, which can be fixed with settings updates.
|
if err != nil {
|
||||||
|
err = fmt.Errorf("persisting public ip address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if singleRunResult != nil {
|
||||||
|
singleRunResult <- err
|
||||||
|
} else if err != nil {
|
||||||
l.logger.Error(err.Error())
|
l.logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Loop) fetchIPData(ctx context.Context) (result models.PublicIP, err error) {
|
func (l *Loop) RunOnce(ctx context.Context) (err error) {
|
||||||
// keep retrying since settings updates won't change the
|
singleRunCtx, singleRunCancel := context.WithCancel(ctx)
|
||||||
// behavior of the following code.
|
select {
|
||||||
const defaultBackoffTime = 5 * time.Second
|
case l.runTrigger <- singleRunCtx:
|
||||||
backoffTime := defaultBackoffTime
|
case <-ctx.Done(): // in case writing to run trigger is blocking
|
||||||
for {
|
singleRunCancel()
|
||||||
result, err = l.fetcher.FetchInfo(ctx, netip.Addr{})
|
return ctx.Err()
|
||||||
switch {
|
|
||||||
case err == nil:
|
|
||||||
return result, nil
|
|
||||||
case ctx.Err() != nil:
|
|
||||||
return result, err
|
|
||||||
case errors.Is(err, api.ErrTooManyRequests):
|
|
||||||
l.logger.Warn(err.Error() + "; not retrying.")
|
|
||||||
return result, err
|
|
||||||
}
|
|
||||||
|
|
||||||
l.logger.Error(fmt.Sprintf("%s - retrying in %s", err, backoffTime))
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return result, ctx.Err()
|
|
||||||
case <-time.After(backoffTime):
|
|
||||||
}
|
|
||||||
const backoffTimeMultipler = 2
|
|
||||||
backoffTime *= backoffTimeMultipler
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Loop) StartSingleRun() {
|
select {
|
||||||
l.runTrigger <- struct{}{}
|
case err = <-l.runResult:
|
||||||
|
singleRunCancel()
|
||||||
|
return err
|
||||||
|
case <-l.runCtx.Done():
|
||||||
|
singleRunCancel()
|
||||||
|
<-l.runResult
|
||||||
|
return l.runCtx.Err()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Loop) UpdateWith(partialUpdate settings.PublicIP) (err error) {
|
func (l *Loop) UpdateWith(partialUpdate settings.PublicIP) (err error) {
|
||||||
|
|||||||
@@ -89,6 +89,6 @@ type DNSLoop interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type PublicIPLoop interface {
|
type PublicIPLoop interface {
|
||||||
StartSingleRun()
|
RunOnce(ctx context.Context) (err error)
|
||||||
ClearData() (err error)
|
ClearData() (err error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,7 +29,10 @@ func (l *Loop) onTunnelUp(ctx context.Context, data tunnelUpData) {
|
|||||||
_, _ = l.dnsLooper.ApplyStatus(ctx, constants.Running)
|
_, _ = l.dnsLooper.ApplyStatus(ctx, constants.Running)
|
||||||
}
|
}
|
||||||
|
|
||||||
l.publicip.StartSingleRun()
|
err := l.publicip.RunOnce(ctx)
|
||||||
|
if err != nil {
|
||||||
|
l.logger.Error("getting public IP address information: " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
if l.versionInfo {
|
if l.versionInfo {
|
||||||
l.versionInfo = false // only get the version information once
|
l.versionInfo = false // only get the version information once
|
||||||
@@ -41,7 +44,7 @@ func (l *Loop) onTunnelUp(ctx context.Context, data tunnelUpData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := l.startPortForwarding(data)
|
err = l.startPortForwarding(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.logger.Error(err.Error())
|
l.logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user