Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a6873816 authored by root's avatar root
Browse files

feat: Implement RFC 5424-Compliant Logs for Streaming Analytics

parent 87270aa6
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -2,12 +2,21 @@ package main

import (
	//Internal dependencies
	"proxy-stt/app/logger"
	"proxy-stt/app/murena-api"
	"proxy-stt/app/streamlog"
	"proxy-stt/app/quic-router"
	"proxy-stt/app/websocket"
)

func main() {
	logPath := "/var/log/murena/streams.log"
	if err := streamlog.Init(logPath); err != nil {
		logger.GetLogger().Fatal().Err(err).Msg("Failed to init stream logger")
	}

	defer streamlog.CloseLogger()

	murenaUserCanDoSTT := murenaApi.UserCanDoSTTProd{}
	websocketGetConnectionProd := websocket.GetConnectionProd{}
	websocketStartConnectionProd := websocket.StartConnectionProd{}
+22 −0
Original line number Diff line number Diff line
@@ -7,11 +7,14 @@ import (
	"net/http"
	"os"
	"strings"
	"sync"
	"time"

	//Internal dependencies
	"proxy-stt/app/logger"
	"proxy-stt/app/murena-api"
	"proxy-stt/app/streamlog"
	"proxy-stt/app/utils"
	"proxy-stt/app/websocket"

	// External dependencies
@@ -21,6 +24,11 @@ import (

var log = logger.GetLogger()

var (
	activeStreams int
	streamMu      sync.Mutex
)

type UploadHandler struct {
	MurenaUserCanDoSTT murenaApi.UserCanDoSTTInterface
	WebsocketGetConnection websocket.GetConnectionInterface
@@ -196,11 +204,25 @@ func (h *UploadHandler) HandleUpload(context *gin.Context) {
	reqID := context.GetString("reqID")
	baseLog := context.GetString("baseLog")
	lang := context.GetString("lang")
	username := context.GetString("username")
	clientID := fmt.Sprintf("client_%d", time.Now().UnixNano())
	stopRead := make(chan bool)
	startTime := time.Now()

	streamMu.Lock()
	concurrent := activeStreams
	activeStreams++
	streamMu.Unlock()

	streamlog.StreamStart(clientID, utils.AnonymizeDeterministic(username), concurrent)
	go h.WebsocketStartConnection.StartConnection(reqID, lang, clientID, context, stopRead)
	defer func() {
		streamMu.Lock()
		activeStreams--
		streamMu.Unlock()

		duration := int(time.Since(startTime).Seconds())
		streamlog.StreamEnd(clientID, duration)
		h.WebsocketCloseConnection.CloseConnection(reqID, clientID, stopRead);
		log.Info().Msgf("%s Closed", baseLog)
	}()
+88 −0
Original line number Diff line number Diff line
package streamlog

import (
	"fmt"
	"os"
	"path/filepath"
	"sync"
	"time"

	// External dependencies
	"github.com/rs/zerolog"
)

var streamSDID = "stream@12345"
var hostname = "murena-streamer"
var appName = "proxy-stt"
var streamlogger zerolog.Logger
var (
	logFile *os.File
	mu      sync.Mutex
)

func Init(logPath string) error {
	if v := os.Getenv("STREAM_SDID"); v != "" {
		streamSDID = v
	}
	if v := os.Getenv("PROXY_HOSTNAME"); v != "" {
		hostname = v
	}
	if v := os.Getenv("APP_NAME"); v != "" {
		appName = v
	}

	dir := filepath.Dir(logPath)
    if err := os.MkdirAll(dir, 0755); err != nil {
		return fmt.Errorf("cannot create log directory: %w", err)
    }

	f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		return fmt.Errorf("cannot open log file: %w", err)
	}

	streamlogger = zerolog.New(f).With().Logger()
	logFile = f

	return nil
}

func CloseLogger() error {
    mu.Lock()
    defer mu.Unlock()
    if logFile != nil {
        err := logFile.Close()
        logFile = nil
        return err
    }
    return nil
}

func formatRFC5424(msg string, sd string) string {
	pri := 165
	version := 1
	timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
	procID := "-"
	return fmt.Sprintf("<%d>%d %s %s %s %s %s %s",
		pri, version, timestamp, hostname, appName, procID, sd, msg)
}

func StreamStart(streamID, userID string, concurrent int) {
	sd := fmt.Sprintf(`[%s stream_id="%s" user_id="%s" concurrent_streams="%d"]`, streamSDID, streamID, userID, concurrent)
	WriteRFC5424Line(formatRFC5424("Stream started", sd))
}

func StreamEnd(streamID string, duration int) {
	sd := fmt.Sprintf(`[%s stream_id="%s" duration_sec="%d"]`, streamSDID, streamID, duration)
	WriteRFC5424Line(formatRFC5424("Stream ended", sd))
}

func WriteRFC5424Line(line string) error {
    if logFile == nil {
		return fmt.Errorf("log file not initialized")
	}
	mu.Lock()
	defer mu.Unlock()
	_, err := logFile.WriteString(line + "\n")
	return err
}
 No newline at end of file

app/utils/utils.go

0 → 100644
+22 −0
Original line number Diff line number Diff line
package utils

import (
	"crypto/sha256"
	"encoding/base32"
)

func AnonymizeDeterministic(s string) string {
	hash := sha256.Sum256([]byte(s))
	hashStr := base32.StdEncoding.EncodeToString(hash[:])

	result := []rune(s)
	hashIndex := 0
	for i := range result {
		result[i] = rune(hashStr[hashIndex])
		hashIndex++
		if hashIndex >= len(hashStr) {
			hashIndex = 0
		}
	}
	return string(result)
}
 No newline at end of file