Add cron tick of command queue

This commit is contained in:
Marc Di Luzio 2020-06-06 15:52:03 +01:00
parent 0a0a32cf58
commit 573bfbf9c7
7 changed files with 113 additions and 13 deletions

1
go.mod
View file

@ -5,5 +5,6 @@ go 1.14
require ( require (
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/robfig/cron v1.2.0
github.com/stretchr/testify v1.6.0 github.com/stretchr/testify v1.6.0
) )

2
go.sum
View file

@ -6,6 +6,8 @@ github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho= github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho=
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=

View file

@ -26,6 +26,9 @@ func TestCommand_Move(t *testing.T) {
moveCommand := Command{Command: CommandMove, Bearing: bearing.String(), Duration: duration} moveCommand := Command{Command: CommandMove, Bearing: bearing.String(), Duration: duration}
assert.NoError(t, world.Enqueue(a, moveCommand), "Failed to execute move command") assert.NoError(t, world.Enqueue(a, moveCommand), "Failed to execute move command")
// Tick the world
world.ExecuteCommandQueues()
newpos, err := world.RoverPosition(a) newpos, err := world.RoverPosition(a)
assert.NoError(t, err, "Failed to set position for rover") assert.NoError(t, err, "Failed to set position for rover")
pos.Add(Vector{0.0, duration * attribs.Speed}) // We should have moved duration*speed north pos.Add(Vector{0.0, duration * attribs.Speed}) // We should have moved duration*speed north

View file

@ -204,7 +204,7 @@ func (w *World) Enqueue(rover uuid.UUID, commands ...Command) error {
} }
// Execute will execute any commands in the current command queue // Execute will execute any commands in the current command queue
func (w *World) Execute() error { func (w *World) ExecuteCommandQueues() {
w.cmdMutex.Lock() w.cmdMutex.Lock()
defer w.cmdMutex.Unlock() defer w.cmdMutex.Unlock()
@ -231,14 +231,10 @@ func (w *World) Execute() error {
delete(w.CommandQueue, rover) delete(w.CommandQueue, rover)
} }
} }
return nil
} }
// ExecuteCommand will execute a single command // ExecuteCommand will execute a single command
func (w *World) ExecuteCommand(c *Command, rover uuid.UUID) (finished bool, err error) { func (w *World) ExecuteCommand(c *Command, rover uuid.UUID) (finished bool, err error) {
w.worldMutex.Lock()
defer w.worldMutex.Unlock()
switch c.Command { switch c.Command {
case "move": case "move":
@ -263,3 +259,27 @@ func (w *World) ExecuteCommand(c *Command, rover uuid.UUID) (finished bool, err
return return
} }
// RLock read locks the world
func (w *World) RLock() {
w.worldMutex.RLock()
w.cmdMutex.RLock()
}
// RUnlock read unlocks the world
func (w *World) RUnlock() {
w.worldMutex.RUnlock()
w.cmdMutex.RUnlock()
}
// Lock locks the world
func (w *World) Lock() {
w.worldMutex.Lock()
w.cmdMutex.Lock()
}
// Unlock unlocks the world
func (w *World) Unlock() {
w.worldMutex.Unlock()
w.cmdMutex.Unlock()
}

View file

@ -84,7 +84,12 @@ func HandleRegister(s *Server, vars map[string]string, b io.ReadCloser, w io.Wri
} else if acc, err := s.accountant.RegisterAccount(data.Name); err != nil { } else if acc, err := s.accountant.RegisterAccount(data.Name); err != nil {
response.Error = err.Error() response.Error = err.Error()
} else if err := s.SaveAll(); err != nil {
response.Error = fmt.Sprintf("Internal server error when saving accounts: %s", err)
} else { } else {
// Save out the new accounts
response.Id = acc.Id.String() response.Id = acc.Id.String()
response.Success = true response.Success = true
} }

View file

@ -124,6 +124,9 @@ func TestHandleCommand(t *testing.T) {
attrib, err := s.world.RoverAttributes(inst) attrib, err := s.world.RoverAttributes(inst)
assert.NoError(t, err, "Couldn't get rover attribs") assert.NoError(t, err, "Couldn't get rover attribs")
// Tick the command queues to progress the move command
s.world.ExecuteCommandQueues()
pos2, err := s.world.RoverPosition(inst) pos2, err := s.world.RoverPosition(inst)
assert.NoError(t, err, "Couldn't get rover position") assert.NoError(t, err, "Couldn't get rover position")
pos.Add(game.Vector{X: 0.0, Y: attrib.Speed * 1}) // Should have moved north by the speed and duration pos.Add(game.Vector{X: 0.0, Y: attrib.Speed * 1}) // Should have moved north by the speed and duration

View file

@ -15,6 +15,7 @@ import (
"github.com/mdiluz/rove/pkg/accounts" "github.com/mdiluz/rove/pkg/accounts"
"github.com/mdiluz/rove/pkg/game" "github.com/mdiluz/rove/pkg/game"
"github.com/mdiluz/rove/pkg/persistence" "github.com/mdiluz/rove/pkg/persistence"
"github.com/robfig/cron"
) )
const ( const (
@ -27,19 +28,25 @@ const (
// Server contains the relevant data to run a game server // Server contains the relevant data to run a game server
type Server struct { type Server struct {
address string
// Internal state
accountant *accounts.Accountant accountant *accounts.Accountant
world *game.World world *game.World
// HTTP server
listener net.Listener listener net.Listener
server *http.Server server *http.Server
router *mux.Router
router *mux.Router // Config settings
address string
persistence int persistence int
// sync point for sub-threads
sync sync.WaitGroup sync sync.WaitGroup
// cron schedule for world ticks
schedule *cron.Cron
} }
// ServerOption defines a server creation option // ServerOption defines a server creation option
@ -69,6 +76,7 @@ func NewServer(opts ...ServerOption) *Server {
address: "", address: "",
persistence: EphemeralData, persistence: EphemeralData,
router: router, router: router,
schedule: cron.New(),
} }
// Apply all options // Apply all options
@ -93,10 +101,8 @@ func (s *Server) Initialise() (err error) {
s.sync.Add(1) s.sync.Add(1)
// Load the accounts if requested // Load the accounts if requested
if s.persistence == PersistentData { if err := s.LoadAll(); err != nil {
if err := persistence.LoadAll("accounts", &s.accountant, "world", &s.world); err != nil { return err
return err
}
} }
// Set up the handlers // Set up the handlers
@ -122,6 +128,20 @@ func (s *Server) Addr() string {
func (s *Server) Run() { func (s *Server) Run() {
defer s.sync.Done() defer s.sync.Done()
// Set up the schedule
s.schedule.AddFunc("0,30", func() {
// Ensure we don't quit during this function
s.sync.Add(1)
defer s.sync.Done()
// Run the command queues
s.world.ExecuteCommandQueues()
// Save out the new world state
s.SaveWorld()
})
s.schedule.Start()
// Serve the http requests // Serve the http requests
if err := s.server.Serve(s.listener); err != nil && err != http.ErrServerClosed { if err := s.server.Serve(s.listener); err != nil && err != http.ErrServerClosed {
log.Fatal(err) log.Fatal(err)
@ -130,6 +150,9 @@ func (s *Server) Run() {
// Close closes up the server // Close closes up the server
func (s *Server) Close() error { func (s *Server) Close() error {
// Stop the cron
s.schedule.Stop()
// Try and shut down the http server // Try and shut down the http server
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
@ -137,11 +160,42 @@ func (s *Server) Close() error {
return err return err
} }
// Wait until the server is shut down // Wait until the server has shut down
s.sync.Wait() s.sync.Wait()
// Save and return
return s.SaveAll()
}
// SaveWorld will save out the world file
func (s *Server) SaveWorld() error {
if s.persistence == PersistentData {
s.world.RLock()
defer s.world.RUnlock()
if err := persistence.SaveAll("world", s.world); err != nil {
return fmt.Errorf("failed to save out persistent data: %s", err)
}
}
return nil
}
// SaveAccounts will save out the accounts file
func (s *Server) SaveAccounts() error {
if s.persistence == PersistentData {
if err := persistence.SaveAll("accounts", s.accountant); err != nil {
return fmt.Errorf("failed to save out persistent data: %s", err)
}
}
return nil
}
// SaveAll will save out all server files
func (s *Server) SaveAll() error {
// Save the accounts if requested // Save the accounts if requested
if s.persistence == PersistentData { if s.persistence == PersistentData {
s.world.RLock()
defer s.world.RUnlock()
if err := persistence.SaveAll("accounts", s.accountant, "world", s.world); err != nil { if err := persistence.SaveAll("accounts", s.accountant, "world", s.world); err != nil {
return err return err
} }
@ -149,6 +203,18 @@ func (s *Server) Close() error {
return nil return nil
} }
// LoadAll will load all persistent data
func (s *Server) LoadAll() error {
if s.persistence == PersistentData {
s.world.Lock()
defer s.world.Unlock()
if err := persistence.LoadAll("accounts", &s.accountant, "world", &s.world); err != nil {
return err
}
}
return nil
}
// wrapHandler wraps a request handler in http checks // wrapHandler wraps a request handler in http checks
func (s *Server) wrapHandler(method string, handler Handler) func(w http.ResponseWriter, r *http.Request) { func (s *Server) wrapHandler(method string, handler Handler) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {