Skip to content

Commit aedbc61

Browse files
committed
Attempt at getting things working plus some addons. Heartbeat is still not finished. Logging is still not fully finished.
1 parent a41fc51 commit aedbc61

File tree

9 files changed

+879
-127
lines changed

9 files changed

+879
-127
lines changed

cmd/dxcluster-client/main.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,16 @@ func RunApplication(ctx context.Context, args []string) int {
140140

141141
// LOG_LEVEL was applied earlier so subsequent logs respect the selected level.
142142
logging.Info("Configuration loaded. WebPort: %d, MaxCache: %d, DataDir: %s", cfg.WebPort, cfg.MaxCache, cfg.DataDir)
143+
144+
// Debug configuration details
145+
logging.Debug("DX Config Debug: DXCHost='%s', DXCPort='%s', DXCCallsign='%s', DXCPassword='%s'", cfg.DXCHost, cfg.DXCPort, cfg.DXCCallsign, cfg.DXCPassword)
146+
logging.Debug("CLUSTERS JSON: '%s'", cfg.RawClustersJSON)
147+
143148
if len(cfg.Clusters) > 0 {
144149
logging.Info("DX Clusters configured: %d", len(cfg.Clusters))
150+
for i, cluster := range cfg.Clusters {
151+
logging.Info(" Cluster %d: %s (%s:%s) callsign=%s", i+1, cluster.ClusterName, cluster.Host, cluster.Port, cluster.Callsign)
152+
}
145153
} else {
146154
logging.Warn("No DX Clusters configured.")
147155
}
@@ -262,21 +270,28 @@ func RunApplication(ctx context.Context, args []string) int {
262270
spotChannels := make([]<-chan spot.Spot, 0) // Collect all spot output channels
263271

264272
// Add DX Cluster spot channels (convert cluster.Spot -> unified spot.Spot)
265-
for _, client := range dxClusterClients {
273+
for i, client := range dxClusterClients {
274+
clusterName := dxClusterNames[i]
266275
// Buffer the forwarder channel so brief startup ordering doesn't drop
267276
// spots that are emitted before the merge goroutines are scheduled.
268277
ch := make(chan spot.Spot, 8)
269278
// Forwarder goroutine: convert cluster.Spot to spot.Spot and send on ch
270-
go func(c *cluster.Client, out chan<- spot.Spot) {
279+
go func(c *cluster.Client, out chan<- spot.Spot, clusterName string) {
271280
defer close(out)
281+
forwardedCount := 0
272282
for {
273283
select {
274284
case <-ctx.Done():
285+
logging.Info("Cluster forwarder [%s] shutting down. Forwarded %d spots.", clusterName, forwardedCount)
275286
return
276287
case s, ok := <-c.SpotChan:
277288
if !ok {
289+
logging.Info("Cluster forwarder [%s] spot channel closed. Forwarded %d spots.", clusterName, forwardedCount)
278290
return
279291
}
292+
forwardedCount++
293+
logging.Info("CLUSTER [%s] RAW SPOT #%d: %s -> %s @ %.3f kHz - %s", clusterName, forwardedCount, s.Spotter, s.Spotted, s.Frequency, s.Message)
294+
280295
// Send into the buffered forwarder channel. If the application is
281296
// shutting down, exit promptly via ctx.Done.
282297
select {
@@ -288,12 +303,13 @@ func RunApplication(ctx context.Context, args []string) int {
288303
When: s.When,
289304
Source: s.Source,
290305
}:
306+
logging.Info("CLUSTER [%s] FORWARDED SPOT #%d to aggregator", clusterName, forwardedCount)
291307
case <-ctx.Done():
292308
return
293309
}
294310
}
295311
}
296-
}(client, ch)
312+
}(client, ch, clusterName)
297313

298314
spotChannels = append(spotChannels, ch)
299315

@@ -302,7 +318,9 @@ func RunApplication(ctx context.Context, args []string) int {
302318
for {
303319
select {
304320
case err := <-c.ErrorChan:
305-
logging.Error("from DX Cluster: %v", err)
321+
if err != nil {
322+
logging.Error("from DX Cluster: %v", err)
323+
}
306324
case msg := <-c.MessageChan:
307325
_ = msg // Ignore generic messages for now
308326
case <-ctx.Done():
@@ -357,31 +375,36 @@ func RunApplication(ctx context.Context, args []string) int {
357375
merged := mergeSpotChannels(spotChannels...)
358376
// Verbose tracing for spot pipeline; enabled by setting DX_API_VERBOSE_SPOT_PIPELINE=1
359377
verbose := os.Getenv("DX_API_VERBOSE_SPOT_PIPELINE") == "1" || strings.ToLower(os.Getenv("DX_API_VERBOSE_SPOT_PIPELINE")) == "true"
378+
spotCount := 0
360379
for {
361380
select {
362381
case <-ctx.Done():
363-
logging.Info("Spot aggregation goroutine shutting down due to context.")
382+
logging.Info("Spot aggregation goroutine shutting down due to context. Total spots processed: %d", spotCount)
364383
return
365384
case receivedSpot, ok := <-merged:
366385
if !ok {
367-
logging.Info("Spot aggregation: merged channel closed, exiting goroutine.")
386+
logging.Info("Spot aggregation: merged channel closed, exiting goroutine. Total spots processed: %d", spotCount)
368387
return
369388
}
389+
spotCount++
390+
logging.Info("SPOT #%d RECEIVED: %s -> %s @ %.3f kHz [%s] - %s", spotCount, receivedSpot.Spotter, receivedSpot.Spotted, receivedSpot.Frequency, receivedSpot.Source, receivedSpot.Message)
370391
if verbose {
371392
logging.Debug("Aggregator received spot: source=%s spotter=%s spotted=%s freq=%.3f msg=%q when=%s", receivedSpot.Source, receivedSpot.Spotter, receivedSpot.Spotted, receivedSpot.Frequency, receivedSpot.Message, receivedSpot.When.Format(time.RFC3339))
372393
}
373394
enrichedSpot, err := enrichSpot(ctx, receivedSpot, dxccClient, lotwClient)
374395
if err != nil {
375-
logging.Error("Failed to enrich spot from %s (%s->%s @ %.3f MHz): %v", receivedSpot.Source, receivedSpot.Spotter, receivedSpot.Spotted, receivedSpot.Frequency, err)
396+
logging.Error("Failed to enrich spot from %s (%s->%s @ %.3f kHz): %v", receivedSpot.Source, receivedSpot.Spotter, receivedSpot.Spotted, receivedSpot.Frequency, err)
376397
// Still add the non-enriched spot if enrichment fails, or discard?
377398
// For now, add the partially enriched one.
378399
centralSpotCache.AddSpot(receivedSpot)
400+
logging.Info("SPOT #%d CACHED (non-enriched): %s -> %s @ %.3f kHz [%s]", spotCount, receivedSpot.Spotter, receivedSpot.Spotted, receivedSpot.Frequency, receivedSpot.Source)
379401
if verbose {
380402
logging.Debug("Aggregator added non-enriched spot from %s (spotter=%s)", receivedSpot.Source, receivedSpot.Spotter)
381403
}
382404
continue
383405
}
384406
centralSpotCache.AddSpot(enrichedSpot)
407+
logging.Info("SPOT #%d CACHED (enriched): %s -> %s @ %.3f kHz [%s] Band=%s", spotCount, enrichedSpot.Spotter, enrichedSpot.Spotted, enrichedSpot.Frequency, enrichedSpot.Source, enrichedSpot.Band)
385408
if verbose {
386409
logging.Debug("Aggregator added enriched spot from %s (spotter=%s)", enrichedSpot.Source, enrichedSpot.Spotter)
387410
}
@@ -412,12 +435,12 @@ func RunApplication(ctx context.Context, args []string) int {
412435
c.Status(http.StatusOK)
413436
})
414437
apiGroup := router.Group(cfg.BaseURL)
415-
setupAPIRoutes(apiGroup, centralSpotCache, dxccClient)
438+
setupAPIRoutes(apiGroup, centralSpotCache, dxccClient, lotwClient)
416439
} else {
417440
router.GET("/healthz", func(c *gin.Context) {
418441
c.Status(http.StatusOK)
419442
})
420-
setupAPIRoutes(router.Group("/"), centralSpotCache, dxccClient)
443+
setupAPIRoutes(router.Group("/"), centralSpotCache, dxccClient, lotwClient)
421444
}
422445

423446
srv := &http.Server{
@@ -530,6 +553,7 @@ func enrichSpot(ctx context.Context, s spot.Spot, dxccClient *dxcc.Client, lotwC
530553

531554
// Add Band information
532555
s.Band = spot.BandFromName(s.Frequency)
556+
logging.Debug("Band assignment: %.3f kHz -> %s", s.Frequency, s.Band)
533557

534558
return s, nil
535559
}
@@ -561,7 +585,7 @@ func mergeSpotChannels(channels ...<-chan spot.Spot) <-chan spot.Spot {
561585
}
562586

563587
// setupAPIRoutes configures all API endpoints.
564-
func setupAPIRoutes(r *gin.RouterGroup, cache *spotCache, dxccClient *dxcc.Client) {
588+
func setupAPIRoutes(r *gin.RouterGroup, cache *spotCache, dxccClient *dxcc.Client, lotwClient *lotw.Client) {
565589
// GET /spots - Retrieve all cached spots.
566590
r.GET("/spots", func(c *gin.Context) {
567591
spots := cache.GetAllSpots()
@@ -697,6 +721,19 @@ func setupAPIRoutes(r *gin.RouterGroup, cache *spotCache, dxccClient *dxcc.Clien
697721
"oldest": nil, // Will be ISO string or null
698722
}
699723

724+
// Add DXCC and LoTW last update times
725+
if dxccLastUpdate, err := dxccClient.GetLastDownloadTime(context.Background()); err == nil && !dxccLastUpdate.IsZero() {
726+
stats["dxcc_last_updated"] = dxccLastUpdate.Format(time.RFC3339)
727+
} else {
728+
stats["dxcc_last_updated"] = nil
729+
}
730+
731+
if lotwLastUpdate, err := lotwClient.GetLastDownloadTime(context.Background()); err == nil && !lotwLastUpdate.IsZero() {
732+
stats["lotw_last_updated"] = lotwLastUpdate.Format(time.RFC3339)
733+
} else {
734+
stats["lotw_last_updated"] = nil
735+
}
736+
700737
if len(spots) > 0 {
701738
youngest := time.Time{} // Zero time
702739
oldest := time.Now().Add(100 * 365 * 24 * time.Hour) // Far future

0 commit comments

Comments
 (0)