Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 76b8b2a4 authored by Fahim Salam Chowdhury's avatar Fahim Salam Chowdhury 👽
Browse files

Merge branch '4675-add-compliant-logs-for-streaming-analytics' into 'main'

Add compliant logs for streaming analytics

See merge request e/infra/proxy-stt!4
parents d0fef329 3b0035b7
Loading
Loading
Loading
Loading
+7 −0
Original line number Original line Diff line number Diff line
@@ -2,6 +2,7 @@


| Environment name                     | Default value            | Mandatory                                            | Description                                                                     |
| Environment name                     | Default value            | Mandatory                                            | Description                                                                     |
|--------------------------------------|--------------------------|------------------------------------------------------|---------------------------------------------------------------------------------|
|--------------------------------------|--------------------------|------------------------------------------------------|---------------------------------------------------------------------------------|
| APP_NAME                             | proxy-stt                | No                                                   | App name for stream logs                                                        |
| CRT_PATH                             | -                        | Yes                                                  | Path of the crt file for certificate                                            |
| CRT_PATH                             | -                        | Yes                                                  | Path of the crt file for certificate                                            |
| GIN_MODE                             | debug                    | No                                                   | Must be `debug` or `release`                                                    |
| GIN_MODE                             | debug                    | No                                                   | Must be `debug` or `release`                                                    |
| KEY_PATH                             | -                        | Yes                                                  | Path of the key file for certificate                                            |
| KEY_PATH                             | -                        | Yes                                                  | Path of the key file for certificate                                            |
@@ -10,8 +11,14 @@
| MURENA_API_ADMIN_PASSWORD            | -                        | Yes                                                  | Murena API admin password                                                       |
| MURENA_API_ADMIN_PASSWORD            | -                        | Yes                                                  | Murena API admin password                                                       |
| MURENA_API_HOST                      | -                        | Yes                                                  | Murena API host server                                                          |
| MURENA_API_HOST                      | -                        | Yes                                                  | Murena API host server                                                          |
| OPENAI_API_KEY                       | -                        | Yes                                                  | OpenAI API key                                                                  |
| OPENAI_API_KEY                       | -                        | Yes                                                  | OpenAI API key                                                                  |
| PROXY_HOSTNAME                       | murena-streamer             | No                                                | Hostname for stream logs                                                        |
| STREAM_SDID                          | stream@12345             | No                                                   | Stream SD ID for stream logs                                                    |
| PORT                                 | 888                      | No                                                   | The port used by quic protocol                                                  |
| PORT                                 | 888                      | No                                                   | The port used by quic protocol                                                  |


# Stream logs

Path : `/var/log/murena/streams.log`

# Tests
# Tests


## How to run tests ?
## How to run tests ?
+10 −0
Original line number Original line Diff line number Diff line
@@ -2,12 +2,22 @@ package main


import (
import (
	//Internal dependencies
	//Internal dependencies
	"proxy-stt/app/logger"
	"proxy-stt/app/murena-api"
	"proxy-stt/app/murena-api"
	"proxy-stt/app/streamlog"
	"proxy-stt/app/quic-router"
	"proxy-stt/app/quic-router"
	"proxy-stt/app/websocket"
	"proxy-stt/app/websocket"
)
)


var log = logger.GetLogger()

func main() {
func main() {
	if err := streamlog.Init(); err != nil {
		log.Fatal().Err(err).Msg("Failed to init stream logger")
	}

	defer streamlog.CloseLogger()

	murenaUserCanDoSTT := murenaApi.UserCanDoSTTProd{}
	murenaUserCanDoSTT := murenaApi.UserCanDoSTTProd{}
	websocketGetConnectionProd := websocket.GetConnectionProd{}
	websocketGetConnectionProd := websocket.GetConnectionProd{}
	websocketStartConnectionProd := websocket.StartConnectionProd{}
	websocketStartConnectionProd := websocket.StartConnectionProd{}
+22 −0
Original line number Original line Diff line number Diff line
@@ -7,11 +7,14 @@ import (
	"net/http"
	"net/http"
	"os"
	"os"
	"strings"
	"strings"
	"sync"
	"time"
	"time"


	//Internal dependencies
	//Internal dependencies
	"proxy-stt/app/logger"
	"proxy-stt/app/logger"
	"proxy-stt/app/murena-api"
	"proxy-stt/app/murena-api"
	"proxy-stt/app/streamlog"
	"proxy-stt/app/utils"
	"proxy-stt/app/websocket"
	"proxy-stt/app/websocket"


	// External dependencies
	// External dependencies
@@ -21,6 +24,11 @@ import (


var log = logger.GetLogger()
var log = logger.GetLogger()


var (
	activeStreams int
	streamMu      sync.Mutex
)

type UploadHandler struct {
type UploadHandler struct {
	MurenaUserCanDoSTT murenaApi.UserCanDoSTTInterface
	MurenaUserCanDoSTT murenaApi.UserCanDoSTTInterface
	WebsocketGetConnection websocket.GetConnectionInterface
	WebsocketGetConnection websocket.GetConnectionInterface
@@ -207,11 +215,25 @@ func (h *UploadHandler) HandleUpload(context *gin.Context) {
	reqID := context.GetString("reqID")
	reqID := context.GetString("reqID")
	baseLog := context.GetString("baseLog")
	baseLog := context.GetString("baseLog")
	lang := context.GetString("lang")
	lang := context.GetString("lang")
	username := context.GetString("username")
	clientID := fmt.Sprintf("client_%d", time.Now().UnixNano())
	clientID := fmt.Sprintf("client_%d", time.Now().UnixNano())
	stopRead := make(chan bool)
	stopRead := make(chan bool)
	startTime := time.Now()


	streamMu.Lock()
	concurrent := activeStreams
	activeStreams++
	streamMu.Unlock()

	streamlog.StreamStart(clientID, utils.AnonymizeDeterministic(username), concurrent)
	go h.WebsocketStartConnection.StartConnection(reqID, lang, clientID, context, stopRead)
	go h.WebsocketStartConnection.StartConnection(reqID, lang, clientID, context, stopRead)
	defer func() {
	defer func() {
		streamMu.Lock()
		activeStreams--
		streamMu.Unlock()

		duration := int(time.Since(startTime).Seconds())
		streamlog.StreamEnd(clientID, duration)
		h.WebsocketCloseConnection.CloseConnection(reqID, clientID, stopRead);
		h.WebsocketCloseConnection.CloseConnection(reqID, clientID, stopRead);
		log.Info().Msgf("%s Closed", baseLog)
		log.Info().Msgf("%s Closed", baseLog)
	}()
	}()
+118 −0
Original line number Original line Diff line number Diff line
package streamlog

import (
	"fmt"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"sync"
	"time"

	//Internal dependencies
	"proxy-stt/app/logger"
)

const baseLog = "[streamlog]"
const logPath = "/var/log/murena/streams.log"
var streamSDID = "stream@12345"
var hostname = "murena-streamer"
var appName = "proxy-stt"
var (
	logFile *os.File
	mu      sync.Mutex
)
var log = logger.GetLogger()

func Init() error {
	if envValue := os.Getenv("STREAM_SDID"); envValue != "" {
		streamSDID = envValue
	}
	if envValue := os.Getenv("PROXY_HOSTNAME"); envValue != "" {
		hostname = envValue
	}
	if envValue := os.Getenv("APP_NAME"); envValue != "" {
		appName = envValue
	}

	dir := filepath.Dir(logPath)
    if err := os.MkdirAll(dir, 0755); err != nil {
		return fmt.Errorf("cannot create log directory: %w", err)
    }

	if err := openLogFile(); err != nil {
		return fmt.Errorf("Fail to open log file: %w", err)
	}

	setupSIGHUPHandler()

	return nil
}

func setupSIGHUPHandler() {
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGHUP)
	go func() {
		for range sigs {
			log.Info().Msgf("%s SIGHUP received, reopen log file", baseLog)
			if err := openLogFile(); err != nil {
				log.Error().Msgf("%s Fail to open loglog file: %v", baseLog, err)
			}
		}
	}()
}

func openLogFile() error {
	mu.Lock()
	defer mu.Unlock()
	if logFile != nil {
		logFile.Close()
	}
	f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		return fmt.Errorf("cannot open log file: %w", err)
	}
	logFile = f
	log.Debug().Msgf("Opened log file on %s", logPath)
	return nil
}

func CloseLogger() error {
    mu.Lock()
    defer mu.Unlock()
    if logFile != nil {
        err := logFile.Close()
        logFile = nil
        return err
    }
    return nil
}

func formatRFC5424(msg string, sd string) string {
	pri := 165
	version := 1
	timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
	procID := "-"
	return fmt.Sprintf("<%d>%d %s %s %s %s %s %s",
		pri, version, timestamp, hostname, appName, procID, sd, msg)
}

func StreamStart(streamID, userID string, concurrent int) {
	sd := fmt.Sprintf(`[%s stream_id="%s" user_id="%s" concurrent_streams="%d"]`, streamSDID, streamID, userID, concurrent)
	WriteRFC5424Line(formatRFC5424("Stream started", sd))
}

func StreamEnd(streamID string, duration int) {
	sd := fmt.Sprintf(`[%s stream_id="%s" duration_sec="%d"]`, streamSDID, streamID, duration)
	WriteRFC5424Line(formatRFC5424("Stream ended", sd))
}

func WriteRFC5424Line(line string) error {
    if logFile == nil {
		return fmt.Errorf("log file not initialized")
	}
	mu.Lock()
	defer mu.Unlock()
	_, err := logFile.WriteString(line + "\n")
	return err
}
 No newline at end of file

app/utils/utils.go

0 → 100644
+22 −0
Original line number Original line Diff line number Diff line
package utils

import (
	"crypto/sha256"
	"encoding/base32"
)

func AnonymizeDeterministic(s string) string {
	hash := sha256.Sum256([]byte(s))
	hashStr := base32.StdEncoding.EncodeToString(hash[:])

	result := []rune(s)
	hashIndex := 0
	for i := range result {
		result[i] = rune(hashStr[hashIndex])
		hashIndex++
		if hashIndex >= len(hashStr) {
			hashIndex = 0
		}
	}
	return string(result)
}
 No newline at end of file