Loading .gitlab-ci.yml +21 −6 Original line number Diff line number Diff line Loading @@ -8,12 +8,6 @@ before_script: - echo $CI_JOB_TOKEN | docker login -u gitlab-ci-token --password-stdin $CI_REGISTRY docker: extends: .docker script: - docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG . rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event"' docker-latest: extends: .docker Loading @@ -30,3 +24,24 @@ docker-tag: - docker push $CI_REGISTRY_IMAGE:${CI_COMMIT_TAG/v/} rules: - if: '$CI_COMMIT_TAG' docker-branch: extends: .docker script: - docker build -t $CI_REGISTRY_IMAGE:${CI_COMMIT_REF_NAME//\//-} . - docker push $CI_REGISTRY_IMAGE:${CI_COMMIT_REF_NAME//\//-} rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_COMMIT_REF_NAME != "main"' deploy-staging: stage: deploy needs: ["docker-branch"] rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_COMMIT_REF_NAME != "main"' variables: STT_CONTAINER_IMAGE_TAG: $CI_COMMIT_REF_NAME trigger: project: 'e/online-services/infra/stt-proxy-compose-stack' branch: 'main' when: manual README.md +7 −0 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ | Environment name | Default value | Mandatory | Description | |--------------------------------------|--------------------------|------------------------------------------------------|---------------------------------------------------------------------------------| | APP_NAME | proxy-stt | No | App name for stream logs | | CRT_PATH | - | Yes | Path of the crt file for certificate | | GIN_MODE | debug | No | Must be `debug` or `release` | | KEY_PATH | - | Yes | Path of the key file for certificate | Loading @@ -10,9 +11,15 @@ | MURENA_API_ADMIN_PASSWORD | - | Yes | Murena API admin password | | MURENA_API_HOST | - | Yes | Murena API host server | | OPENAI_API_KEY | - | Yes | OpenAI API key | | PROXY_HOSTNAME | murena-streamer | No | Hostname for stream logs | | STREAM_SDID | stream@12345 | No | Stream SD ID for stream logs | | PORT | 888 | No | The port used by quic protocol | | SENTRY_DSN | - | Yes | DSN for Sentry | # Stream logs Path : `/var/log/murena/streams.log` # Tests ## How to run tests ? Loading app/main.go +8 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import ( //Internal dependencies "proxy-stt/app/logger" "proxy-stt/app/murena-api" "proxy-stt/app/streamlog" "proxy-stt/app/quic-router" "proxy-stt/app/sentry-wrapper" "proxy-stt/app/utils" Loading @@ -13,6 +14,13 @@ import ( var log = logger.GetLogger() func main() { // Streamlog if err := streamlog.Init(); err != nil { log.Fatal().Err(err).Msg("Failed to init stream logger") } defer streamlog.CloseLogger() // Sentry flush, err := sentryWrapper.Init() if err != nil { log.Fatal().Msg(err.Error()) Loading app/quic-router/quic-router.go +38 −6 Original line number Diff line number Diff line Loading @@ -7,11 +7,13 @@ import ( "net/http" "os" "strings" "sync" "time" //Internal dependencies "proxy-stt/app/logger" "proxy-stt/app/murena-api" "proxy-stt/app/streamlog" "proxy-stt/app/utils" "proxy-stt/app/websocket" Loading @@ -22,6 +24,11 @@ import ( var log = logger.GetLogger() var ( activeStreams int streamMu sync.Mutex ) type UploadHandler struct { MurenaUserCanDoSTT murenaApi.UserCanDoSTTInterface WebsocketGetConnection websocket.GetConnectionInterface Loading Loading @@ -148,9 +155,7 @@ func (h *UploadHandler) AuthMiddlewareV2() gin.HandlerFunc { // Username username := c.DefaultQuery("username", "") log.Debug().Msgf("%s Requested username : %s", baseLog, username) if strings.HasSuffix(username, "@murena.io") { username = strings.TrimSuffix(username, "@murena.io") } username = extractUsernameFromMail(username) log.Debug().Msgf("%s Formated username for Murena API request : %s", baseLog, username) granted := h.MurenaUserCanDoSTT.InvokeV2(reqID, accessToken, username) Loading @@ -169,6 +174,21 @@ func (h *UploadHandler) AuthMiddlewareV2() gin.HandlerFunc { } } func extractUsernameFromMail(mail string) string { suffixes := []string{ "@murena.io", "@murenatest.io", } for _, s := range suffixes { if strings.HasSuffix(mail, s) { return strings.TrimSuffix(mail, s) } } return mail } func (h *UploadHandler) AuthMiddlewareV1() gin.HandlerFunc { return func(c *gin.Context) { reqID := fmt.Sprintf("req_%d", time.Now().UnixNano()) Loading @@ -182,9 +202,7 @@ func (h *UploadHandler) AuthMiddlewareV1() gin.HandlerFunc { // Username username := c.DefaultQuery("username", "") log.Debug().Msgf("%s Requested username : %s", baseLog, username) if strings.HasSuffix(username, "@murena.io") { username = strings.TrimSuffix(username, "@murena.io") } username = extractUsernameFromMail(username) log.Debug().Msgf("%s Formated username for Murena API request : %s", baseLog, username) granted := h.MurenaUserCanDoSTT.InvokeV1(reqID, username) Loading @@ -210,11 +228,25 @@ func (h *UploadHandler) HandleUpload(context *gin.Context) { reqID := context.GetString("reqID") baseLog := context.GetString("baseLog") lang := context.GetString("lang") username := context.GetString("username") clientID := fmt.Sprintf("client_%d", time.Now().UnixNano()) stopRead := make(chan bool) startTime := time.Now() streamMu.Lock() concurrent := activeStreams activeStreams++ streamMu.Unlock() streamlog.StreamStart(clientID, utils.AnonymizeDeterministic(username), concurrent) go h.WebsocketStartConnection.StartConnection(reqID, lang, clientID, context, stopRead) defer func() { streamMu.Lock() activeStreams-- streamMu.Unlock() duration := int(time.Since(startTime).Seconds()) streamlog.StreamEnd(clientID, duration) h.WebsocketCloseConnection.CloseConnection(reqID, clientID, stopRead); log.Info().Msgf("%s Closed", baseLog) }() Loading app/streamlog/streamlog.go 0 → 100644 +118 −0 Original line number Diff line number Diff line package streamlog import ( "fmt" "os" "os/signal" "path/filepath" "syscall" "sync" "time" //Internal dependencies "proxy-stt/app/logger" ) const baseLog = "[streamlog]" const logPath = "/var/log/murena/streams.log" var streamSDID = "stream@12345" var hostname = "murena-streamer" var appName = "proxy-stt" var ( logFile *os.File mu sync.Mutex ) var log = logger.GetLogger() func Init() error { if envValue := os.Getenv("STREAM_SDID"); envValue != "" { streamSDID = envValue } if envValue := os.Getenv("PROXY_HOSTNAME"); envValue != "" { hostname = envValue } if envValue := os.Getenv("APP_NAME"); envValue != "" { appName = envValue } dir := filepath.Dir(logPath) if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("cannot create log directory: %w", err) } if err := openLogFile(); err != nil { return fmt.Errorf("Fail to open log file: %w", err) } setupSIGHUPHandler() return nil } func setupSIGHUPHandler() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP) go func() { for range sigs { log.Info().Msgf("%s SIGHUP received, reopen log file", baseLog) if err := openLogFile(); err != nil { log.Error().Msgf("%s Fail to open loglog file: %v", baseLog, err) } } }() } func openLogFile() error { mu.Lock() defer mu.Unlock() if logFile != nil { logFile.Close() } f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("cannot open log file: %w", err) } logFile = f log.Debug().Msgf("Opened log file on %s", logPath) return nil } func CloseLogger() error { mu.Lock() defer mu.Unlock() if logFile != nil { err := logFile.Close() logFile = nil return err } return nil } func formatRFC5424(msg string, sd string) string { pri := 165 version := 1 timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") procID := "-" return fmt.Sprintf("<%d>%d %s %s %s %s %s %s", pri, version, timestamp, hostname, appName, procID, sd, msg) } func StreamStart(streamID, userID string, concurrent int) { sd := fmt.Sprintf(`[%s stream_id="%s" user_id="%s" concurrent_streams="%d"]`, streamSDID, streamID, userID, concurrent) WriteRFC5424Line(formatRFC5424("Stream started", sd)) } func StreamEnd(streamID string, duration int) { sd := fmt.Sprintf(`[%s stream_id="%s" duration_sec="%d"]`, streamSDID, streamID, duration) WriteRFC5424Line(formatRFC5424("Stream ended", sd)) } func WriteRFC5424Line(line string) error { if logFile == nil { return fmt.Errorf("log file not initialized") } mu.Lock() defer mu.Unlock() _, err := logFile.WriteString(line + "\n") return err } No newline at end of file Loading
.gitlab-ci.yml +21 −6 Original line number Diff line number Diff line Loading @@ -8,12 +8,6 @@ before_script: - echo $CI_JOB_TOKEN | docker login -u gitlab-ci-token --password-stdin $CI_REGISTRY docker: extends: .docker script: - docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG . rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event"' docker-latest: extends: .docker Loading @@ -30,3 +24,24 @@ docker-tag: - docker push $CI_REGISTRY_IMAGE:${CI_COMMIT_TAG/v/} rules: - if: '$CI_COMMIT_TAG' docker-branch: extends: .docker script: - docker build -t $CI_REGISTRY_IMAGE:${CI_COMMIT_REF_NAME//\//-} . - docker push $CI_REGISTRY_IMAGE:${CI_COMMIT_REF_NAME//\//-} rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_COMMIT_REF_NAME != "main"' deploy-staging: stage: deploy needs: ["docker-branch"] rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_COMMIT_REF_NAME != "main"' variables: STT_CONTAINER_IMAGE_TAG: $CI_COMMIT_REF_NAME trigger: project: 'e/online-services/infra/stt-proxy-compose-stack' branch: 'main' when: manual
README.md +7 −0 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ | Environment name | Default value | Mandatory | Description | |--------------------------------------|--------------------------|------------------------------------------------------|---------------------------------------------------------------------------------| | APP_NAME | proxy-stt | No | App name for stream logs | | CRT_PATH | - | Yes | Path of the crt file for certificate | | GIN_MODE | debug | No | Must be `debug` or `release` | | KEY_PATH | - | Yes | Path of the key file for certificate | Loading @@ -10,9 +11,15 @@ | MURENA_API_ADMIN_PASSWORD | - | Yes | Murena API admin password | | MURENA_API_HOST | - | Yes | Murena API host server | | OPENAI_API_KEY | - | Yes | OpenAI API key | | PROXY_HOSTNAME | murena-streamer | No | Hostname for stream logs | | STREAM_SDID | stream@12345 | No | Stream SD ID for stream logs | | PORT | 888 | No | The port used by quic protocol | | SENTRY_DSN | - | Yes | DSN for Sentry | # Stream logs Path : `/var/log/murena/streams.log` # Tests ## How to run tests ? Loading
app/main.go +8 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import ( //Internal dependencies "proxy-stt/app/logger" "proxy-stt/app/murena-api" "proxy-stt/app/streamlog" "proxy-stt/app/quic-router" "proxy-stt/app/sentry-wrapper" "proxy-stt/app/utils" Loading @@ -13,6 +14,13 @@ import ( var log = logger.GetLogger() func main() { // Streamlog if err := streamlog.Init(); err != nil { log.Fatal().Err(err).Msg("Failed to init stream logger") } defer streamlog.CloseLogger() // Sentry flush, err := sentryWrapper.Init() if err != nil { log.Fatal().Msg(err.Error()) Loading
app/quic-router/quic-router.go +38 −6 Original line number Diff line number Diff line Loading @@ -7,11 +7,13 @@ import ( "net/http" "os" "strings" "sync" "time" //Internal dependencies "proxy-stt/app/logger" "proxy-stt/app/murena-api" "proxy-stt/app/streamlog" "proxy-stt/app/utils" "proxy-stt/app/websocket" Loading @@ -22,6 +24,11 @@ import ( var log = logger.GetLogger() var ( activeStreams int streamMu sync.Mutex ) type UploadHandler struct { MurenaUserCanDoSTT murenaApi.UserCanDoSTTInterface WebsocketGetConnection websocket.GetConnectionInterface Loading Loading @@ -148,9 +155,7 @@ func (h *UploadHandler) AuthMiddlewareV2() gin.HandlerFunc { // Username username := c.DefaultQuery("username", "") log.Debug().Msgf("%s Requested username : %s", baseLog, username) if strings.HasSuffix(username, "@murena.io") { username = strings.TrimSuffix(username, "@murena.io") } username = extractUsernameFromMail(username) log.Debug().Msgf("%s Formated username for Murena API request : %s", baseLog, username) granted := h.MurenaUserCanDoSTT.InvokeV2(reqID, accessToken, username) Loading @@ -169,6 +174,21 @@ func (h *UploadHandler) AuthMiddlewareV2() gin.HandlerFunc { } } func extractUsernameFromMail(mail string) string { suffixes := []string{ "@murena.io", "@murenatest.io", } for _, s := range suffixes { if strings.HasSuffix(mail, s) { return strings.TrimSuffix(mail, s) } } return mail } func (h *UploadHandler) AuthMiddlewareV1() gin.HandlerFunc { return func(c *gin.Context) { reqID := fmt.Sprintf("req_%d", time.Now().UnixNano()) Loading @@ -182,9 +202,7 @@ func (h *UploadHandler) AuthMiddlewareV1() gin.HandlerFunc { // Username username := c.DefaultQuery("username", "") log.Debug().Msgf("%s Requested username : %s", baseLog, username) if strings.HasSuffix(username, "@murena.io") { username = strings.TrimSuffix(username, "@murena.io") } username = extractUsernameFromMail(username) log.Debug().Msgf("%s Formated username for Murena API request : %s", baseLog, username) granted := h.MurenaUserCanDoSTT.InvokeV1(reqID, username) Loading @@ -210,11 +228,25 @@ func (h *UploadHandler) HandleUpload(context *gin.Context) { reqID := context.GetString("reqID") baseLog := context.GetString("baseLog") lang := context.GetString("lang") username := context.GetString("username") clientID := fmt.Sprintf("client_%d", time.Now().UnixNano()) stopRead := make(chan bool) startTime := time.Now() streamMu.Lock() concurrent := activeStreams activeStreams++ streamMu.Unlock() streamlog.StreamStart(clientID, utils.AnonymizeDeterministic(username), concurrent) go h.WebsocketStartConnection.StartConnection(reqID, lang, clientID, context, stopRead) defer func() { streamMu.Lock() activeStreams-- streamMu.Unlock() duration := int(time.Since(startTime).Seconds()) streamlog.StreamEnd(clientID, duration) h.WebsocketCloseConnection.CloseConnection(reqID, clientID, stopRead); log.Info().Msgf("%s Closed", baseLog) }() Loading
app/streamlog/streamlog.go 0 → 100644 +118 −0 Original line number Diff line number Diff line package streamlog import ( "fmt" "os" "os/signal" "path/filepath" "syscall" "sync" "time" //Internal dependencies "proxy-stt/app/logger" ) const baseLog = "[streamlog]" const logPath = "/var/log/murena/streams.log" var streamSDID = "stream@12345" var hostname = "murena-streamer" var appName = "proxy-stt" var ( logFile *os.File mu sync.Mutex ) var log = logger.GetLogger() func Init() error { if envValue := os.Getenv("STREAM_SDID"); envValue != "" { streamSDID = envValue } if envValue := os.Getenv("PROXY_HOSTNAME"); envValue != "" { hostname = envValue } if envValue := os.Getenv("APP_NAME"); envValue != "" { appName = envValue } dir := filepath.Dir(logPath) if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("cannot create log directory: %w", err) } if err := openLogFile(); err != nil { return fmt.Errorf("Fail to open log file: %w", err) } setupSIGHUPHandler() return nil } func setupSIGHUPHandler() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP) go func() { for range sigs { log.Info().Msgf("%s SIGHUP received, reopen log file", baseLog) if err := openLogFile(); err != nil { log.Error().Msgf("%s Fail to open loglog file: %v", baseLog, err) } } }() } func openLogFile() error { mu.Lock() defer mu.Unlock() if logFile != nil { logFile.Close() } f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("cannot open log file: %w", err) } logFile = f log.Debug().Msgf("Opened log file on %s", logPath) return nil } func CloseLogger() error { mu.Lock() defer mu.Unlock() if logFile != nil { err := logFile.Close() logFile = nil return err } return nil } func formatRFC5424(msg string, sd string) string { pri := 165 version := 1 timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") procID := "-" return fmt.Sprintf("<%d>%d %s %s %s %s %s %s", pri, version, timestamp, hostname, appName, procID, sd, msg) } func StreamStart(streamID, userID string, concurrent int) { sd := fmt.Sprintf(`[%s stream_id="%s" user_id="%s" concurrent_streams="%d"]`, streamSDID, streamID, userID, concurrent) WriteRFC5424Line(formatRFC5424("Stream started", sd)) } func StreamEnd(streamID string, duration int) { sd := fmt.Sprintf(`[%s stream_id="%s" duration_sec="%d"]`, streamSDID, streamID, duration) WriteRFC5424Line(formatRFC5424("Stream ended", sd)) } func WriteRFC5424Line(line string) error { if logFile == nil { return fmt.Errorf("log file not initialized") } mu.Lock() defer mu.Unlock() _, err := logFile.WriteString(line + "\n") return err } No newline at end of file