offline movements

master
Denis Ranneft 1 month ago
parent 4f97cd2b98
commit dd21bff29d

@ -119,6 +119,8 @@ func main() {
} }
hub.SendToHero(heroID, "adventure_log_line", line) hub.SendToHero(heroID, "adventure_log_line", line)
}) })
engine.SetDigestStore(digestStore)
engine.SetHeroSubscriber(hub.IsHeroConnected)
// Hub callbacks: on connect, load hero and register movement; on disconnect, persist. // Hub callbacks: on connect, load hero and register movement; on disconnect, persist.
hub.OnConnect = func(heroID int64) { hub.OnConnect = func(heroID int64) {
@ -130,11 +132,12 @@ func main() {
engine.RegisterHeroMovement(hero) engine.RegisterHeroMovement(hero)
} }
hub.OnDisconnect = func(heroID int64, remainingSameHero int) { hub.OnDisconnect = func(heroID int64, remainingSameHero int) {
engine.HeroSocketDetached(heroID, remainingSameHero == 0) disconnectAt := time.Now()
engine.HeroSocketDetached(heroID, remainingSameHero == 0, disconnectAt)
if remainingSameHero == 0 { if remainingSameHero == 0 {
dctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) dctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() defer cancel()
if err := heroStore.SetWsDisconnectedAt(dctx, heroID, time.Now()); err != nil { if err := heroStore.SetWsDisconnectedAt(dctx, heroID, disconnectAt); err != nil {
logger.Warn("set ws_disconnected_at", "hero_id", heroID, "error", err) logger.Warn("set ws_disconnected_at", "hero_id", heroID, "error", err)
} }
} }
@ -177,24 +180,20 @@ func main() {
} }
}() }()
// Start game engine.
go func() {
if err := engine.Run(ctx); err != nil && err != context.Canceled {
logger.Error("game engine error", "error", err)
}
}()
// Record server start time for catch-up gap calculation. // Record server start time for catch-up gap calculation.
serverStartedAt := time.Now() serverStartedAt := time.Now()
offlineSim := game.NewOfflineSimulator(heroStore, logStore, questStore, roadGraph, logger, func() bool { bootstrapSim := game.NewOfflineSimulator(heroStore, logStore, questStore, roadGraph, logger, nil, nil).
return engine.IsTimePaused()
}, engine.HeroHasActiveMovement).
WithCombatTickRate(engine.TickRate()). WithCombatTickRate(engine.TickRate()).
WithRewardStores(gearStore, achievementStore, taskStore). WithRewardStores(gearStore, achievementStore, taskStore).
WithDigestStore(digestStore) WithDigestStore(digestStore)
bootCtx, bootCancel := context.WithTimeout(ctx, 3*time.Minute)
game.BootstrapResidentHeroes(bootCtx, engine, heroStore, bootstrapSim, 500, logger)
bootCancel()
// Start game engine (after resident heroes are registered).
go func() { go func() {
if err := offlineSim.Run(ctx); err != nil && err != context.Canceled { if err := engine.Run(ctx); err != nil && err != context.Canceled {
logger.Error("offline simulator error", "error", err) logger.Error("game engine error", "error", err)
} }
}() }()

@ -77,8 +77,17 @@ type Engine struct {
// npcAlmsHandler runs when the client accepts a wandering merchant offer (WS). // npcAlmsHandler runs when the client accepts a wandering merchant offer (WS).
npcAlmsHandler func(context.Context, int64) error npcAlmsHandler func(context.Context, int64) error
digestStore *storage.OfflineDigestStore
// heroSubscriber reports whether the hero has at least one WebSocket client (optional).
heroSubscriber func(heroID int64) bool
// lastDisconnectedFullSave tracks periodic DB full saves for heroes without a WS subscriber.
lastDisconnectedFullSave map[int64]time.Time
} }
// offlineDisconnectedFullSaveInterval is how often we persist a full hero row when no WS client is connected.
const offlineDisconnectedFullSaveInterval = 30 * time.Second
// NewEngine creates a new game engine with the given tick rate. // NewEngine creates a new game engine with the given tick rate.
func NewEngine(tickRate time.Duration, eventCh chan model.CombatEvent, logger *slog.Logger) *Engine { func NewEngine(tickRate time.Duration, eventCh chan model.CombatEvent, logger *slog.Logger) *Engine {
e := &Engine{ e := &Engine{
@ -89,6 +98,7 @@ func NewEngine(tickRate time.Duration, eventCh chan model.CombatEvent, logger *s
incomingCh: make(chan IncomingMessage, 256), incomingCh: make(chan IncomingMessage, 256),
eventCh: eventCh, eventCh: eventCh,
logger: logger, logger: logger,
lastDisconnectedFullSave: make(map[int64]time.Time),
} }
heap.Init(&e.queue) heap.Init(&e.queue)
return e return e
@ -98,7 +108,24 @@ func (e *Engine) GetMovements(heroId int64) *HeroMovement {
return e.movements[heroId] return e.movements[heroId]
} }
// HeroHasActiveMovement is true while the hero has an in-engine movement session (typically WebSocket-connected). // MergeResidentHeroState copies the authoritative in-engine hero into dst after SyncToHero.
// Returns false if the hero is not resident. Used by REST init so the client sees the same state the Engine simulates.
func (e *Engine) MergeResidentHeroState(dst *model.Hero) bool {
if dst == nil {
return false
}
e.mu.RLock()
hm := e.movements[dst.ID]
e.mu.RUnlock()
if hm == nil || hm.Hero == nil {
return false
}
hm.SyncToHero()
*dst = *hm.Hero
return true
}
// HeroHasActiveMovement is true while the hero has an in-engine movement session (resident world actor).
func (e *Engine) HeroHasActiveMovement(heroID int64) bool { func (e *Engine) HeroHasActiveMovement(heroID int64) bool {
e.mu.RLock() e.mu.RLock()
defer e.mu.RUnlock() defer e.mu.RUnlock()
@ -272,6 +299,28 @@ func (e *Engine) SetAdventureLog(w AdventureLogWriter) {
e.adventureLog = w e.adventureLog = w
} }
// SetDigestStore wires persistent offline digest accumulation (after disconnect grace).
func (e *Engine) SetDigestStore(d *storage.OfflineDigestStore) {
e.mu.Lock()
defer e.mu.Unlock()
e.digestStore = d
}
// SetHeroSubscriber sets an optional callback: return true if the hero has at least one WebSocket client.
// Used for periodic full saves when the world keeps simulating without a subscriber.
func (e *Engine) SetHeroSubscriber(fn func(heroID int64) bool) {
e.mu.Lock()
defer e.mu.Unlock()
e.heroSubscriber = fn
}
func (e *Engine) applyOfflineDigest(ctx context.Context, heroID int64, hero *model.Hero, now time.Time, delta storage.OfflineDigestDelta) {
if e.digestStore == nil || hero == nil || !OfflineDigestCollecting(hero.WsDisconnectedAt, now) {
return
}
_ = e.digestStore.ApplyDelta(ctx, heroID, delta)
}
// IncomingCh returns the channel for routing client WS commands into the engine. // IncomingCh returns the channel for routing client WS commands into the engine.
func (e *Engine) IncomingCh() chan<- IncomingMessage { func (e *Engine) IncomingCh() chan<- IncomingMessage {
return e.incomingCh return e.incomingCh
@ -594,6 +643,7 @@ func (e *Engine) RegisterHeroMovement(hero *model.Hero) {
// Reconnect while the previous socket is still tearing down: keep live movement so we // Reconnect while the previous socket is still tearing down: keep live movement so we
// do not replace (x,y) and route with a stale DB snapshot. // do not replace (x,y) and route with a stale DB snapshot.
if existing, ok := e.movements[hero.ID]; ok { if existing, ok := e.movements[hero.ID]; ok {
existing.Hero.WsDisconnectedAt = hero.WsDisconnectedAt
existing.Hero.EnsureGearMap() existing.Hero.EnsureGearMap()
existing.Hero.RefreshDerivedCombatStats(now) existing.Hero.RefreshDerivedCombatStats(now)
e.logger.Info("hero movement reattached (existing session)", e.logger.Info("hero movement reattached (existing session)",
@ -621,6 +671,19 @@ func (e *Engine) RegisterHeroMovement(hero *model.Hero) {
hm.MarkTownPausePersisted(hm.townPausePersistSignature()) hm.MarkTownPausePersisted(hm.townPausePersistSignature())
hm.SyncToHero() hm.SyncToHero()
// DB said fighting but engine has no combat (e.g. after restart): attach a new encounter.
if hm.State == model.StateFighting {
if _, exists := e.combats[hero.ID]; !exists {
en := PickEnemyForLevel(hero.Level)
if en.Slug != "" {
e.startCombatLocked(hm.Hero, &en)
} else {
hm.State = model.StateWalking
hm.Hero.State = model.StateWalking
}
}
}
e.logger.Info("hero movement registered", e.logger.Info("hero movement registered",
"hero_id", hero.ID, "hero_id", hero.ID,
"state", hm.State, "state", hm.State,
@ -647,15 +710,16 @@ func (e *Engine) RegisterHeroMovement(hero *model.Hero) {
} }
} }
// HeroSocketDetached persists hero state on every WS disconnect and removes in-memory // HeroSocketDetached persists hero state on every WS disconnect. Movement and combat stay in the engine
// movement only when lastConnection is true (no other tabs/sockets for this hero). // so the world keeps simulating; disconnectedAt is stored on the in-memory hero for offline digest timing.
func (e *Engine) HeroSocketDetached(heroID int64, lastConnection bool) { func (e *Engine) HeroSocketDetached(heroID int64, lastConnection bool, disconnectedAt time.Time) {
e.mu.Lock() e.mu.Lock()
hm, ok := e.movements[heroID] hm, ok := e.movements[heroID]
if ok { if ok {
hm.SyncToHero() hm.SyncToHero()
if lastConnection { if lastConnection && !disconnectedAt.IsZero() && hm.Hero != nil {
delete(e.movements, heroID) t := disconnectedAt
hm.Hero.WsDisconnectedAt = &t
} }
} }
var heroSnap *model.Hero var heroSnap *model.Hero
@ -1225,11 +1289,15 @@ func (e *Engine) GetCombat(heroID int64) (*model.CombatState, bool) {
return cs, ok return cs, ok
} }
// processCombatTick is the 100ms combat processing tick. // processCombatTick is the combat processing tick (typically 100ms cadence).
func (e *Engine) processCombatTick(now time.Time) { func (e *Engine) processCombatTick(now time.Time) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
e.processCombatTickLocked(now)
}
// processCombatTickLocked runs combat logic; caller must hold e.mu.
func (e *Engine) processCombatTickLocked(now time.Time) {
// Heroes resting or touring town must not keep fighting in the background. // Heroes resting or touring town must not keep fighting in the background.
var purgeCombat []int64 var purgeCombat []int64
for heroID := range e.combats { for heroID := range e.combats {
@ -1299,6 +1367,9 @@ func (e *Engine) processCombatTick(now time.Time) {
if hm, ok := e.movements[heroID]; ok { if hm, ok := e.movements[heroID]; ok {
hm.Die() hm.Die()
} }
dctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
e.applyOfflineDigest(dctx, heroID, cs.Hero, now, storage.OfflineDigestDelta{Deaths: 1})
cancel()
delete(e.combats, heroID) delete(e.combats, heroID)
} }
} }
@ -1462,6 +1533,9 @@ func (e *Engine) processEnemyAttack(cs *model.CombatState, now time.Time) {
if hm, ok := e.movements[cs.HeroID]; ok { if hm, ok := e.movements[cs.HeroID]; ok {
hm.Die() hm.Die()
} }
dctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
e.applyOfflineDigest(dctx, cs.HeroID, cs.Hero, now, storage.OfflineDigestDelta{Deaths: 1})
cancel()
delete(e.combats, cs.HeroID) delete(e.combats, cs.HeroID)
e.logger.Info("hero died", e.logger.Info("hero died",
@ -1515,6 +1589,18 @@ func (e *Engine) handleEnemyDeath(cs *model.CombatState, now time.Time) {
victoryDrops = e.onEnemyDeath(hero, enemy, now) victoryDrops = e.onEnemyDeath(hero, enemy, now)
} }
if hero != nil {
dctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
e.applyOfflineDigest(dctx, cs.HeroID, hero, now, storage.OfflineDigestDelta{
MonstersKilled: 1,
XPGained: enemy.XPReward,
GoldGained: model.SumGoldFromLootDrops(victoryDrops),
LevelsGained: hero.Level - oldLevel,
LootAppend: NonGoldLootForDigest(victoryDrops),
})
cancel()
}
e.emitEvent(model.CombatEvent{ e.emitEvent(model.CombatEvent{
Type: "combat_end", Type: "combat_end",
HeroID: cs.HeroID, HeroID: cs.HeroID,
@ -1572,6 +1658,50 @@ func (e *Engine) handleEnemyDeath(cs *model.CombatState, now time.Time) {
) )
} }
// processAutoReviveLocked revives dead heroes after AutoReviveAfterMs downtime. Caller holds e.mu.
func (e *Engine) processAutoReviveLocked(now time.Time) {
if e.heroStore == nil {
return
}
gap := time.Duration(tuning.Get().AutoReviveAfterMs) * time.Millisecond
for heroID, hm := range e.movements {
if hm == nil || hm.Hero == nil {
continue
}
h := hm.Hero
if h.State != model.StateDead && h.HP > 0 {
continue
}
if now.Sub(h.UpdatedAt) <= gap {
continue
}
h.HP = int(float64(h.MaxHP) * tuning.Get().ReviveHpPercent)
if h.HP < 1 {
h.HP = 1
}
h.State = model.StateWalking
h.Debuffs = nil
hm.State = model.StateWalking
hm.SyncToHero()
dctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
e.applyOfflineDigest(dctx, heroID, h, now, storage.OfflineDigestDelta{Revives: 1})
cancel()
if e.adventureLog != nil {
e.adventureLog(heroID, model.AdventureLogLine{
Event: &model.AdventureLogEvent{
Code: model.LogAutoReviveAfterSec,
Args: map[string]any{"seconds": int64(gap.Round(time.Second) / time.Second)},
},
})
}
ctx, cancelSave := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.heroStore.Save(ctx, h); err != nil && e.logger != nil {
e.logger.Error("persist hero after auto-revive", "hero_id", heroID, "error", err)
}
cancelSave()
}
}
// processMovementTick advances all walking heroes and checks for encounters. // processMovementTick advances all walking heroes and checks for encounters.
// Runs on the configured movement cadence. // Runs on the configured movement cadence.
func (e *Engine) processMovementTick(now time.Time) { func (e *Engine) processMovementTick(now time.Time) {
@ -1582,6 +1712,8 @@ func (e *Engine) processMovementTick(now time.Time) {
return return
} }
e.processAutoReviveLocked(now)
startCombat := func(hm *HeroMovement, enemy *model.Enemy, t time.Time) { startCombat := func(hm *HeroMovement, enemy *model.Enemy, t time.Time) {
e.startCombatLocked(hm.Hero, enemy) e.startCombatLocked(hm.Hero, enemy)
} }
@ -1612,6 +1744,21 @@ func (e *Engine) processMovementTick(now time.Time) {
hm.MarkTownPausePersisted(sig) hm.MarkTownPausePersisted(sig)
e.syncTownSessionRedis(heroID, hm) e.syncTownSessionRedis(heroID, hm)
} }
if e.heroStore != nil && e.heroSubscriber != nil && hm.Hero != nil && !e.heroSubscriber(heroID) {
last := e.lastDisconnectedFullSave[heroID]
if last.IsZero() || now.Sub(last) >= offlineDisconnectedFullSaveInterval {
hm.SyncToHero()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.heroStore.Save(ctx, hm.Hero); err != nil {
if e.logger != nil {
e.logger.Error("persist disconnected resident hero", "hero_id", heroID, "error", err)
}
} else {
e.lastDisconnectedFullSave[heroID] = now
}
cancel()
}
}
} }
} }

@ -13,9 +13,19 @@ import (
"github.com/denisovdennis/autohero/internal/tuning" "github.com/denisovdennis/autohero/internal/tuning"
) )
// OfflineSimulator runs periodic background ticks for heroes that are offline, // OfflineDigestGrace is the delay after the last WS disconnect before offline events count toward the digest.
// advancing movement the same way as the online engine (without WebSocket payloads) const OfflineDigestGrace = 30 * time.Second
// and resolving random encounters with SimulateOneFight.
// OfflineDigestCollecting is true when digest deltas should be applied (disconnect + grace elapsed).
func OfflineDigestCollecting(disconnect *time.Time, now time.Time) bool {
if disconnect == nil {
return false
}
return !now.Before(disconnect.Add(OfflineDigestGrace))
}
// OfflineSimulator holds dependencies for one-shot wall-time catch-up (server downtime, cold-start bootstrap).
// Live progression runs in the Engine for all resident heroes.
type OfflineSimulator struct { type OfflineSimulator struct {
store *storage.HeroStore store *storage.HeroStore
logStore *storage.LogStore logStore *storage.LogStore
@ -35,9 +45,8 @@ type OfflineSimulator struct {
digestStore *storage.OfflineDigestStore digestStore *storage.OfflineDigestStore
} }
// NewOfflineSimulator creates a new OfflineSimulator that ticks every 30 seconds. // NewOfflineSimulator builds a catch-up runner used by BootstrapResidentHeroes and REST init gap recovery.
// isPaused may be nil; if it returns true, offline catch-up is skipped (aligned with engine pause). // isPaused and skipIfLive are optional filters for SimulateHeroAt callers; Run() is a no-op.
// skipIfLive may be nil; if it returns true for a hero id, that hero is skipped this tick.
func NewOfflineSimulator(store *storage.HeroStore, logStore *storage.LogStore, questStore *storage.QuestStore, graph *RoadGraph, logger *slog.Logger, isPaused func() bool, skipIfLive func(heroID int64) bool) *OfflineSimulator { func NewOfflineSimulator(store *storage.HeroStore, logStore *storage.LogStore, questStore *storage.QuestStore, graph *RoadGraph, logger *slog.Logger, isPaused func() bool, skipIfLive func(heroID int64) bool) *OfflineSimulator {
return &OfflineSimulator{ return &OfflineSimulator{
store: store, store: store,
@ -75,8 +84,8 @@ func (s *OfflineSimulator) WithDigestStore(d *storage.OfflineDigestStore) *Offli
return s return s
} }
// nonGoldLootForDigest keeps equipment/potion lines only; gold belongs in gold_gained counter. // NonGoldLootForDigest keeps equipment/potion lines only; gold belongs in gold_gained counter.
func nonGoldLootForDigest(drops []model.LootDrop) []model.LootDrop { func NonGoldLootForDigest(drops []model.LootDrop) []model.LootDrop {
if len(drops) == 0 { if len(drops) == 0 {
return nil return nil
} }
@ -93,58 +102,18 @@ func nonGoldLootForDigest(drops []model.LootDrop) []model.LootDrop {
return out return out
} }
// Run starts the offline simulation loop. It blocks until the context is cancelled. // Run is a no-op waiter: progression runs in the game Engine for all resident heroes.
// Kept so callers can block on the same context lifecycle as before.
func (s *OfflineSimulator) Run(ctx context.Context) error { func (s *OfflineSimulator) Run(ctx context.Context) error {
ticker := time.NewTicker(s.interval) <-ctx.Done()
defer ticker.Stop() if s.logger != nil {
s.logger.Info("offline simulator stub shutting down (engine-authoritative world)")
s.logger.Info("offline simulator started", "interval", s.interval)
for {
select {
case <-ctx.Done():
s.logger.Info("offline simulator shutting down")
return ctx.Err()
case <-ticker.C:
s.processTick(ctx)
}
}
}
// processTick finds all offline heroes and simulates one fight for each.
func (s *OfflineSimulator) processTick(ctx context.Context) {
if s.isPaused != nil && s.isPaused() {
return
}
heroes, err := s.store.ListOfflineHeroes(ctx, s.interval*2, 100)
if err != nil {
s.logger.Error("offline simulator: failed to list offline heroes", "error", err)
return
}
if len(heroes) == 0 {
return
}
s.logger.Debug("offline simulator tick", "offline_heroes", len(heroes))
for _, hero := range heroes {
if s.skipIfLive != nil && s.skipIfLive(hero.ID) {
continue
}
if err := s.simulateHeroTick(ctx, hero, time.Now(), true); err != nil {
s.logger.Error("offline simulator: hero tick failed",
"hero_id", hero.ID,
"error", err,
)
// Continue with other heroes — don't crash on one failure.
}
} }
return ctx.Err()
} }
// simulateHeroTick catches up movement in configured movement-tick steps from hero.UpdatedAt to now, // simulateHeroTick catches up movement in configured movement-tick steps from hero.UpdatedAt to now,
// then persists. Random encounters use the same rolls as online; combat is resolved // then persists. Encounters resolve combat via SimulateOneFight (batch-only; live play uses Engine combat).
// synchronously via SimulateOneFight (no WebSocket).
func (s *OfflineSimulator) simulateHeroTick(ctx context.Context, hero *model.Hero, now time.Time, persist bool) error { func (s *OfflineSimulator) simulateHeroTick(ctx context.Context, hero *model.Hero, now time.Time, persist bool) error {
// Auto-revive after configured downtime (autoReviveAfterMs). // Auto-revive after configured downtime (autoReviveAfterMs).
@ -162,7 +131,7 @@ func (s *OfflineSimulator) simulateHeroTick(ctx context.Context, hero *model.Her
Args: map[string]any{"seconds": int64(gap.Round(time.Second) / time.Second)}, Args: map[string]any{"seconds": int64(gap.Round(time.Second) / time.Second)},
}, },
}) })
if s.digestStore != nil { if s.digestStore != nil && OfflineDigestCollecting(hero.WsDisconnectedAt, now) {
_ = s.digestStore.ApplyDelta(ctx, hero.ID, storage.OfflineDigestDelta{Revives: 1}) _ = s.digestStore.ApplyDelta(ctx, hero.ID, storage.OfflineDigestDelta{Revives: 1})
} }
} }
@ -198,7 +167,7 @@ func (s *OfflineSimulator) simulateHeroTick(ctx context.Context, hero *model.Her
rewardDeps := s.rewardDeps(tickNow) rewardDeps := s.rewardDeps(tickNow)
levelBefore := hm.Hero.Level levelBefore := hm.Hero.Level
survived, en, xpGained, goldGained, drops := SimulateOneFight(hm.Hero, tickNow, enemy, s.graph, s.combatTickRate, rewardDeps) survived, en, xpGained, goldGained, drops := SimulateOneFight(hm.Hero, tickNow, enemy, s.graph, s.combatTickRate, rewardDeps)
if s.digestStore != nil { if s.digestStore != nil && OfflineDigestCollecting(hm.Hero.WsDisconnectedAt, tickNow) {
if survived { if survived {
levelGain := hm.Hero.Level - levelBefore levelGain := hm.Hero.Level - levelBefore
_ = s.digestStore.ApplyDelta(ctx, hm.Hero.ID, storage.OfflineDigestDelta{ _ = s.digestStore.ApplyDelta(ctx, hm.Hero.ID, storage.OfflineDigestDelta{
@ -206,7 +175,7 @@ func (s *OfflineSimulator) simulateHeroTick(ctx context.Context, hero *model.Her
XPGained: xpGained, XPGained: xpGained,
GoldGained: goldGained, GoldGained: goldGained,
LevelsGained: levelGain, LevelsGained: levelGain,
LootAppend: nonGoldLootForDigest(drops), LootAppend: NonGoldLootForDigest(drops),
}) })
} else { } else {
_ = s.digestStore.ApplyDelta(ctx, hm.Hero.ID, storage.OfflineDigestDelta{Deaths: 1}) _ = s.digestStore.ApplyDelta(ctx, hm.Hero.ID, storage.OfflineDigestDelta{Deaths: 1})

@ -7,6 +7,21 @@ import (
"github.com/denisovdennis/autohero/internal/model" "github.com/denisovdennis/autohero/internal/model"
) )
func TestOfflineDigestCollecting(t *testing.T) {
now := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)
recent := now.Add(-20 * time.Second)
if OfflineDigestCollecting(&recent, now) {
t.Error("expected false before grace window")
}
old := now.Add(-2 * time.Minute)
if !OfflineDigestCollecting(&old, now) {
t.Error("expected true after grace window")
}
if OfflineDigestCollecting(nil, now) {
t.Error("expected false when disconnect time is nil")
}
}
func TestSimulateOneFight_HeroSurvives(t *testing.T) { func TestSimulateOneFight_HeroSurvives(t *testing.T) {
hero := &model.Hero{ hero := &model.Hero{
Level: 1, XP: 0, Level: 1, XP: 0,
@ -122,14 +137,14 @@ func TestNonGoldLootForDigest(t *testing.T) {
{ItemType: "potion", Rarity: model.RarityCommon}, {ItemType: "potion", Rarity: model.RarityCommon},
{ItemType: "gold", Rarity: model.RarityCommon, GoldAmount: 5}, {ItemType: "gold", Rarity: model.RarityCommon, GoldAmount: 5},
} }
out := nonGoldLootForDigest(drops) out := NonGoldLootForDigest(drops)
if len(out) != 1 || out[0].ItemType != "potion" { if len(out) != 1 || out[0].ItemType != "potion" {
t.Fatalf("want single potion line, got %#v", out) t.Fatalf("want single potion line, got %#v", out)
} }
if nonGoldLootForDigest(nil) != nil { if NonGoldLootForDigest(nil) != nil {
t.Fatal("nil in -> nil out") t.Fatal("nil in -> nil out")
} }
if nonGoldLootForDigest([]model.LootDrop{{ItemType: "gold", GoldAmount: 1}}) != nil { if NonGoldLootForDigest([]model.LootDrop{{ItemType: "gold", GoldAmount: 1}}) != nil {
t.Fatal("gold-only -> nil") t.Fatal("gold-only -> nil")
} }
} }

@ -193,6 +193,11 @@ func (h *GameHandler) GetHero(w http.ResponseWriter, r *http.Request) {
h.logger.Warn("failed to persist buff charges init/rollover", "hero_id", hero.ID, "error", err) h.logger.Warn("failed to persist buff charges init/rollover", "hero_id", hero.ID, "error", err)
} }
} }
if h.engine != nil && !h.engine.IsTimePaused() && h.engine.MergeResidentHeroState(hero) {
if err := h.store.Save(r.Context(), hero); err != nil {
h.logger.Warn("failed to persist engine-merged hero on get", "hero_id", hero.ID, "error", err)
}
}
hero.RefreshDerivedCombatStats(now) hero.RefreshDerivedCombatStats(now)
writeHeroJSON(w, http.StatusOK, hero) writeHeroJSON(w, http.StatusOK, hero)
} }
@ -757,6 +762,10 @@ func (h *GameHandler) catchUpOfflineGap(ctx context.Context, hero *model.Hero) b
if h.engine != nil && h.engine.IsTimePaused() { if h.engine != nil && h.engine.IsTimePaused() {
return false return false
} }
// Engine already advanced this hero since process start; do not run batch SimulateOneFight (second combat path).
if h.engine != nil && h.engine.HeroHasActiveMovement(hero.ID) {
return false
}
gapDuration := h.serverStartedAt.Sub(hero.UpdatedAt) gapDuration := h.serverStartedAt.Sub(hero.UpdatedAt)
if gapDuration < 30*time.Second { if gapDuration < 30*time.Second {
return false return false
@ -838,6 +847,13 @@ func (h *GameHandler) InitHero(w http.ResponseWriter, r *http.Request) {
} }
} }
// Resident heroes: single source of truth is the Engine (same ticks as WS observers).
if h.engine != nil && !simFrozen && h.engine.MergeResidentHeroState(hero) {
if err := h.store.Save(r.Context(), hero); err != nil {
h.logger.Warn("failed to persist engine-merged hero on init", "hero_id", hero.ID, "error", err)
}
}
// Catch-up simulation: cover the gap between hero.UpdatedAt and serverStartedAt // Catch-up simulation: cover the gap between hero.UpdatedAt and serverStartedAt
// (the period when the server was down and the offline simulator wasn't running). // (the period when the server was down and the offline simulator wasn't running).
offlineDuration := time.Since(hero.UpdatedAt) offlineDuration := time.Since(hero.UpdatedAt)

@ -105,7 +105,7 @@ func (h *Hub) Run() {
h.mu.Unlock() h.mu.Unlock()
h.logger.Info("client disconnected", "hero_id", heroID, "remaining_same_hero", remaining) h.logger.Info("client disconnected", "hero_id", heroID, "remaining_same_hero", remaining)
// Always persist; engine drops in-memory movement only when remaining == 0. // Always persist; engine keeps simulating movement/combat without a subscriber.
// Synchronous so a reconnect that loads from DB sees the latest save. // Synchronous so a reconnect that loads from DB sees the latest save.
if existed && h.OnDisconnect != nil { if existed && h.OnDisconnect != nil {
h.OnDisconnect(heroID, remaining) h.OnDisconnect(heroID, remaining)
@ -135,6 +135,9 @@ func (h *Hub) BroadcastEvent(event model.CombatEvent) {
// SendToHero sends a typed message to all WebSocket connections for a specific hero. // SendToHero sends a typed message to all WebSocket connections for a specific hero.
func (h *Hub) SendToHero(heroID int64, msgType string, payload any) { func (h *Hub) SendToHero(heroID int64, msgType string, payload any) {
if !h.IsHeroConnected(heroID) {
return
}
if msgType == "hero_state" { if msgType == "hero_state" {
if hero, ok := payload.(*model.Hero); ok { if hero, ok := payload.(*model.Hero); ok {
model.AttachDebuffCatalogForClient(hero) model.AttachDebuffCatalogForClient(hero)

@ -713,6 +713,55 @@ func (s *HeroStore) ListOfflineHeroes(ctx context.Context, offlineThreshold time
return heroes, nil return heroes, nil
} }
// ListHeroesForEngineBootstrap returns heroes that should be loaded into the game engine after a cold start:
// session ended (ws_disconnected_at set) and simulatable world state. Limit caps memory use.
func (s *HeroStore) ListHeroesForEngineBootstrap(ctx context.Context, limit int) ([]*model.Hero, error) {
if limit <= 0 {
limit = 500
}
if limit > 2000 {
limit = 2000
}
query := heroSelectQuery + `
WHERE h.hp > 0 AND h.ws_disconnected_at IS NOT NULL
AND (
(h.state = 'walking'
AND (h.move_state IS NULL OR h.move_state NOT IN ('in_town', 'resting')))
OR h.state IN ('resting', 'in_town', 'fighting')
)
ORDER BY h.updated_at ASC
LIMIT $1
`
rows, err := s.pool.Query(ctx, query, limit)
if err != nil {
return nil, fmt.Errorf("list heroes for engine bootstrap: %w", err)
}
defer rows.Close()
var heroes []*model.Hero
for rows.Next() {
h, err := scanHeroFromRows(rows)
if err != nil {
return nil, fmt.Errorf("list heroes for engine bootstrap scan: %w", err)
}
heroes = append(heroes, h)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("list heroes for engine bootstrap rows: %w", err)
}
for _, h := range heroes {
if err := s.loadHeroGear(ctx, h); err != nil {
return nil, fmt.Errorf("list heroes for engine bootstrap load gear: %w", err)
}
if err := s.loadHeroInventory(ctx, h); err != nil {
return nil, fmt.Errorf("list heroes for engine bootstrap load inventory: %w", err)
}
}
return heroes, nil
}
// scanHeroFromRows scans the current row from pgx.Rows into a Hero struct. // scanHeroFromRows scans the current row from pgx.Rows into a Hero struct.
// Gear is loaded separately via loadHeroGear after scanning. // Gear is loaded separately via loadHeroGear after scanning.
func scanHeroFromRows(rows pgx.Rows) (*model.Hero, error) { func scanHeroFromRows(rows pgx.Rows) (*model.Hero, error) {

Loading…
Cancel
Save