From 527cf35bbaf11c1721f25df8cd02905f55ec1f0a Mon Sep 17 00:00:00 2001 From: Mingcai SHEN Date: Fri, 16 Sep 2022 14:29:38 +0800 Subject: [PATCH] Enabled rtsp capture. --- Makefile | 14 ++ go.mod | 7 +- go.sum | 1 - h264decoder.go | 140 +++++++++++++ http.go | 64 +++--- main.go | 74 +++---- recog.go | 522 ++++++++++++++++++++++++++----------------------- rtsp.go | 128 ++++++++++++ scene.go | 294 +++++++++++++++------------- 9 files changed, 793 insertions(+), 451 deletions(-) create mode 100644 Makefile create mode 100644 h264decoder.go create mode 100644 rtsp.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bcae411 --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +GOSRCS := $(shell find . -name "*.go") + +all: sport_rec_demo + +clean: + @rm -f sport_rec_demo + +run: sport_rec_demo + @echo "Starting ..." + @./sport_rec_demo serve -L :8080 + +sport_rec_demo: $(GOSRCS) Makefile + @echo "Building $@ ..." + @go build $@ \ No newline at end of file diff --git a/go.mod b/go.mod index 80e192d..b112f45 100644 --- a/go.mod +++ b/go.mod @@ -2,17 +2,20 @@ module sport_rec_demo go 1.18 +require ( + github.com/gofiber/fiber/v2 v2.30.0 + github.com/spf13/cobra v1.4.0 +) + require ( github.com/aler9/gortsplib v0.0.0-20220829125132-e99f799c07e7 // indirect github.com/andybalholm/brotli v1.0.4 // indirect - github.com/gofiber/fiber/v2 v2.30.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/compress v1.15.0 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.9 // indirect github.com/pion/rtp v1.7.13 // indirect github.com/pion/sdp/v3 v3.0.5 // indirect - github.com/spf13/cobra v1.4.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.34.0 // indirect diff --git a/go.sum b/go.sum index ca997d0..4ffa264 100644 --- a/go.sum +++ b/go.sum @@ -42,7 +42,6 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/h264decoder.go b/h264decoder.go new file mode 100644 index 0000000..c24ed10 --- /dev/null +++ b/h264decoder.go @@ -0,0 +1,140 @@ +package main + +import ( + "fmt" + "image" + "unsafe" +) + +// #cgo pkg-config: libavcodec libavutil libswscale +// #include +// #include +// #include +import "C" + +func frameData(frame *C.AVFrame) **C.uint8_t { + return (**C.uint8_t)(unsafe.Pointer(&frame.data[0])) +} + +func frameLineSize(frame *C.AVFrame) *C.int { + return (*C.int)(unsafe.Pointer(&frame.linesize[0])) +} + +// h264Decoder is a wrapper around ffmpeg's H264 decoder. +type h264Decoder struct { + codecCtx *C.AVCodecContext + srcFrame *C.AVFrame + swsCtx *C.struct_SwsContext + dstFrame *C.AVFrame + dstFramePtr []uint8 +} + +// newH264Decoder allocates a new h264Decoder. +func newH264Decoder() (*h264Decoder, error) { + codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H264) + if codec == nil { + return nil, fmt.Errorf("avcodec_find_decoder() failed") + } + + codecCtx := C.avcodec_alloc_context3(codec) + if codecCtx == nil { + return nil, fmt.Errorf("avcodec_alloc_context3() failed") + } + + res := C.avcodec_open2(codecCtx, codec, nil) + if res < 0 { + C.avcodec_close(codecCtx) + return nil, fmt.Errorf("avcodec_open2() failed") + } + + srcFrame := C.av_frame_alloc() + if srcFrame == nil { + C.avcodec_close(codecCtx) + return nil, fmt.Errorf("av_frame_alloc() failed") + } + + return &h264Decoder{ + codecCtx: codecCtx, + srcFrame: srcFrame, + }, nil +} + +// close closes the decoder. +func (d *h264Decoder) close() { + if d.dstFrame != nil { + C.av_frame_free(&d.dstFrame) + } + + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + C.av_frame_free(&d.srcFrame) + C.avcodec_close(d.codecCtx) +} + +func (d *h264Decoder) decode(nalu []byte) (image.Image, error) { + nalu = append([]uint8{0x00, 0x00, 0x00, 0x01}, []uint8(nalu)...) + + // send frame to decoder + var avPacket C.AVPacket + avPacket.data = (*C.uint8_t)(C.CBytes(nalu)) + defer C.free(unsafe.Pointer(avPacket.data)) + avPacket.size = C.int(len(nalu)) + res := C.avcodec_send_packet(d.codecCtx, &avPacket) + if res < 0 { + return nil, nil + } + + // receive frame if available + res = C.avcodec_receive_frame(d.codecCtx, d.srcFrame) + if res < 0 { + return nil, nil + } + + // if frame size has changed, allocate needed objects + if d.dstFrame == nil || d.dstFrame.width != d.srcFrame.width || d.dstFrame.height != d.srcFrame.height { + if d.dstFrame != nil { + C.av_frame_free(&d.dstFrame) + } + + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + d.dstFrame = C.av_frame_alloc() + d.dstFrame.format = C.AV_PIX_FMT_RGBA + d.dstFrame.width = d.srcFrame.width + d.dstFrame.height = d.srcFrame.height + d.dstFrame.color_range = C.AVCOL_RANGE_JPEG + res = C.av_frame_get_buffer(d.dstFrame, 1) + if res < 0 { + return nil, fmt.Errorf("av_frame_get_buffer() err") + } + + d.swsCtx = C.sws_getContext(d.srcFrame.width, d.srcFrame.height, C.AV_PIX_FMT_YUV420P, + d.dstFrame.width, d.dstFrame.height, (int32)(d.dstFrame.format), C.SWS_BILINEAR, nil, nil, nil) + if d.swsCtx == nil { + return nil, fmt.Errorf("sws_getContext() err") + } + + dstFrameSize := C.av_image_get_buffer_size((int32)(d.dstFrame.format), d.dstFrame.width, d.dstFrame.height, 1) + d.dstFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.dstFrame.data[0]))[:dstFrameSize:dstFrameSize] + } + + // convert frame from YUV420 to RGB + res = C.sws_scale(d.swsCtx, frameData(d.srcFrame), frameLineSize(d.srcFrame), + 0, d.srcFrame.height, frameData(d.dstFrame), frameLineSize(d.dstFrame)) + if res < 0 { + return nil, fmt.Errorf("sws_scale() err") + } + + // embed frame into an image.Image + return &image.RGBA{ + Pix: d.dstFramePtr, + Stride: 4 * (int)(d.dstFrame.width), + Rect: image.Rectangle{ + Max: image.Point{(int)(d.dstFrame.width), (int)(d.dstFrame.height)}, + }, + }, nil +} diff --git a/http.go b/http.go index 944f520..6094a5d 100644 --- a/http.go +++ b/http.go @@ -1,49 +1,51 @@ package main import ( - "bytes" - "encoding/json" - "errors" - "log" - "net/http" - "time" + "bytes" + "encoding/json" + "errors" + "log" + "net/http" + "time" ) var httpClient *http.Client var fakeClient = false func httpPost(url string, data interface{}) error { - log.Println("httpPost:url>", url) - if fakeClient { - log.Println("httpPost:data> fake client ") - return nil - } - if bs, e := json.Marshal(data); nil != e { - return e - } else if resp, e := httpClient.Post(url, "application/json", bytes.NewBuffer(bs)); nil != e { - return e - } else if resp.StatusCode != http.StatusOK { - return errors.New("invalid resp status: " + resp.Status) - } else { - return nil - } + log.Println("httpPost:url>", url) + if fakeClient { + log.Println("httpPost:data> fake client ") + return nil + } + if bs, e := json.Marshal(data); nil != e { + return e + } else if resp, e := httpClient.Post(url, "application/json", bytes.NewBuffer(bs)); nil != e { + return e + } else if resp.StatusCode != http.StatusOK { + return errors.New("invalid resp status: " + resp.Status) + } else { + return nil + } } func httpGet(url string, resp interface{}) error { - return nil + return nil } -func httpPostEx(url string, data interface{}, retries int) { - for i := 0; i < retries; i++ { - if e := httpPost(url, data); nil != e { - log.Println(">>>", e) - time.Sleep(3 * time.Second) - } else { - return - } - } +func httpPostEx(url string, data interface{}, retries int) error { + var err error + for i := 0; i < retries; i++ { + if err = httpPost(url, data); nil != err { + log.Println(">>>", err) + time.Sleep(3 * time.Second) + } else { + return nil + } + } + return err } func init() { - httpClient = http.DefaultClient + httpClient = http.DefaultClient } diff --git a/main.go b/main.go index d485883..076eb23 100644 --- a/main.go +++ b/main.go @@ -1,26 +1,26 @@ package main import ( - "fmt" - "github.com/gofiber/fiber/v2" - "github.com/spf13/cobra" - "log" - "os" + "fmt" + "github.com/gofiber/fiber/v2" + "github.com/spf13/cobra" + "log" + "os" ) var listenAddr = ":3000" var rootCmd = &cobra.Command{ - Use: "sport_rec", - Short: "Sport Recognition Demo", - Long: "Sport Recognition Demo", + Use: "sport_rec", + Short: "Sport Recognition Demo", + Long: "Sport Recognition Demo", } func init() { - cobra.OnInitialize(initConfig) - rootCmd.PersistentFlags().StringVarP(&listenAddr, "listen", "L", ":3000", "Service listen address") - rootCmd.PersistentFlags().BoolVarP(&fakeClient, "fake", "F", false, "Fake HTP client instead of pushing") - rootCmd.AddCommand(serveCmd) + cobra.OnInitialize(initConfig) + rootCmd.PersistentFlags().StringVarP(&listenAddr, "listen", "L", ":3000", "Service listen address") + rootCmd.PersistentFlags().BoolVarP(&fakeClient, "fake", "F", false, "Fake HTP client instead of pushing") + rootCmd.AddCommand(serveCmd) } func initConfig() { @@ -28,32 +28,32 @@ func initConfig() { } var serveCmd = &cobra.Command{ - Use: "run", - Short: "Run service", - Run: func(cmd *cobra.Command, args []string) { - app := fiber.New() - - app.Get("/", func(c *fiber.Ctx) error { - return c.SendString("Hello, World!") - }) - - app.Post("/api/v1/setScene", setScene) - app.Post("/api/v1/setTestCmd", setTestCmd) - // app.Post("/api/v1/eventHandle") - app.Post("/api/v1/areaConfig", setAreaConfig) - app.Get("/api/v1/areaConfig", getAreaConfig) - app.Post("/api/v1/stopScene", stopScene) - app.Get("/api/v1/stopScene", stopScene) - app.Get("/api/v1/getStatus", getStatus) - app.Get("/api/v1/getTestStatus", getStatus) - - log.Fatal(app.Listen(listenAddr)) - }, + Use: "serve", + Short: "Run service", + Run: func(cmd *cobra.Command, args []string) { + app := fiber.New() + + app.Get("/", func(c *fiber.Ctx) error { + return c.SendString("Hello, World!") + }) + + app.Post("/api/v1/setScene", setScene) + app.Post("/api/v1/setTestCmd", setTestCmd) + // app.Post("/api/v1/eventHandle") + app.Post("/api/v1/areaConfig", setAreaConfig) + app.Get("/api/v1/areaConfig", getAreaConfig) + app.Post("/api/v1/stopScene", stopScene) + app.Get("/api/v1/stopScene", stopScene) + app.Get("/api/v1/getStatus", getStatus) + app.Get("/api/v1/getTestStatus", getStatus) + + log.Fatal(app.Listen(listenAddr)) + }, } func main() { - if err := rootCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) - } + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } } diff --git a/recog.go b/recog.go index ae950ad..7170ca7 100644 --- a/recog.go +++ b/recog.go @@ -1,38 +1,42 @@ package main import ( - "errors" - "github.com/gofiber/fiber/v2" - "log" - "math/rand" - "time" + "bytes" + "encoding/base64" + "errors" + "github.com/gofiber/fiber/v2" + "image" + "image/jpeg" + "log" + "math/rand" + "time" ) type TestCommand struct { - Command string `json:"cmd"` - SequenceId int64 `json:"sequenceId"` + Command string `json:"cmd"` + SequenceId int64 `json:"sequenceId"` } type TestData struct { - State string `json:"state"` - ExceptionType int `json:"exceptionType,omitempty"` - ExceptionDesc string `json:"exceptionDesc,omitempty"` - Image string `json:"image,omitempty"` - DistanceCM int `json:"distanceCM,omitempty"` - Counts int `json:"counts,omitempty"` - DurationSec float32 `json:"durationSec,omitempty"` + State string `json:"state"` + ExceptionType int `json:"exceptionType,omitempty"` + ExceptionDesc string `json:"exceptionDesc,omitempty"` + Image string `json:"image,omitempty"` + DistanceCM int `json:"distanceCM,omitempty"` + Counts int `json:"counts,omitempty"` + DurationSec float32 `json:"durationSec,omitempty"` } type TestStat struct { - Scene string `json:"scene"` - Timestamp int64 `json:"timestamp"` - SequenceId int64 `json:"sequenceId"` - TestData TestData `json:"sceneData"` + Scene string `json:"scene"` + Timestamp int64 `json:"timestamp"` + SequenceId int64 `json:"sequenceId"` + TestData TestData `json:"sceneData"` } type TestException struct { - Type int - Desc string + Type int + Desc string } var currentTest *TestCommand @@ -43,253 +47,281 @@ var testStopChannel chan bool var testDurations map[string][2]int func setTestCmd(c *fiber.Ctx) error { - var testCmd TestCommand - if err := c.BodyParser(&testCmd); nil != err { - log.Println("ERROR:", c.Body()) - return failureResponse(c, "-1", err.Error(), 200) - } else if nil == currentScene { - return failureResponse(c, "1", "not in a working scene", 200) - } else { - switch testCmd.Command { - case "START": - log.Println("START ...") - if nil != currentTest { - return failureResponse(c, "2", "A test already started", 200) - } - if e := recogTaskProc(&testCmd); nil != e { - return failureResponse(c, "3", e.Error(), 200) - } - case "STOP": - if nil != testStopChannel { - testStopChannel <- true - log.Println("STOP ...") - } + var testCmd TestCommand + if err := c.BodyParser(&testCmd); nil != err { + log.Println("ERROR:", c.Body()) + return failureResponse(c, "-1", err.Error(), 200) + } else if nil == currentScene { + return failureResponse(c, "1", "not in a working scene", 200) + } else { + switch testCmd.Command { + case "START": + log.Println("START ...") + if nil != currentTest { + return failureResponse(c, "2", "A test already started", 200) + } + if e := recogTaskProc(&testCmd); nil != e { + return failureResponse(c, "3", e.Error(), 200) + } + case "STOP": + if nil != testStopChannel { + testStopChannel <- true + log.Println("STOP ...") + } - case "CANCEL": - if nil != testStopChannel { - testStopChannel <- true - log.Println("CANCEL ...") - } - default: - log.Println("UNKNOWN ...") - } - return successResponse(c, "Updated successfully.") - } + case "CANCEL": + if nil != testStopChannel { + testStopChannel <- true + log.Println("CANCEL ...") + } + default: + log.Println("UNKNOWN ...") + } + return successResponse(c, "Updated successfully.") + } } func pullUpTask(seq int64) error { - var tk = time.NewTicker(time.Millisecond * time.Duration(1500+random.Intn(800))) - defer func() { - tk.Stop() - }() - time.Sleep(time.Millisecond * 1300) - total := random.Intn(10) - n := 0 - go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false) - var started = false - for { - select { - case <-testStopChannel: - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n) - return nil - case except := <-testExceptChannel: - if except.Type != 0 { - go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true) - } - case <-tk.C: - log.Println("===") - if !started { - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true, n) - started = true - } else if n >= total { - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n) - return nil - } else { - n += 1 - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", true, n) - } - } - } + var tk = time.NewTicker(time.Millisecond * time.Duration(1500+random.Intn(800))) + defer func() { + tk.Stop() + }() + time.Sleep(time.Millisecond * 1300) + total := random.Intn(10) + n := 0 + go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil) + var started = false + for { + select { + case <-testStopChannel: + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n) + return nil + case except := <-testExceptChannel: + if except.Type != 0 { + go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage) + } + case <-tk.C: + log.Println("===") + if !started { + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage, n) + started = true + } else if n >= total { + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n) + return nil + } else { + n += 1 + go func() { + if e := pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", capturedImage, n); nil != e { + log.Println("pushEvent failed:>", e) + testStopChannel <- true + } + }() + } + } + } } func standJumpTask(seq int64) error { - var tk = time.NewTicker(time.Millisecond * 400) - defer func() { - tk.Stop() - }() - time.Sleep(time.Millisecond * 500) - go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false) - var started = false - for { - select { - case <-testStopChannel: - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, 1000+random.Intn(1000)) - return nil - case except := <-testExceptChannel: - if except.Type != 0 { - go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true) - } - case <-tk.C: - log.Println("===") - if !started { - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true) - started = true - } - } - } + var tk = time.NewTicker(time.Millisecond * 400) + defer func() { + tk.Stop() + }() + time.Sleep(time.Millisecond * 500) + go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil) + var started = false + for { + select { + case <-testStopChannel: + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, 1000+random.Intn(1000)) + return nil + case except := <-testExceptChannel: + if except.Type != 0 { + go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage) + } + case <-tk.C: + log.Println("===") + if !started { + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage) + started = true + } + } + } } func sitUpsTask(seq int64) error { - var tk = time.NewTicker(time.Millisecond * 500) - defer func() { - tk.Stop() - }() - time.Sleep(time.Millisecond * 600) - total := 40 + random.Intn(20) - n := 0 - go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false) - var started = false - for { - select { - case <-testStopChannel: - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n) - return nil - case except := <-testExceptChannel: - if except.Type != 0 { - go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true) - } - case <-tk.C: - log.Println("===") - if !started { - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true, n) - started = true - } else if n >= total { - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n) - return nil - } else { - n += 1 - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", true, n) - } - } - } + var tk = time.NewTicker(time.Millisecond * 500) + defer func() { + tk.Stop() + }() + time.Sleep(time.Millisecond * 600) + total := 40 + random.Intn(20) + n := 0 + go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil) + var started = false + for { + select { + case <-testStopChannel: + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n) + return nil + case except := <-testExceptChannel: + if except.Type != 0 { + go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage) + } + case <-tk.C: + log.Println("===") + if !started { + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage, n) + started = true + } else if n >= total { + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n) + return nil + } else { + n += 1 + go func() { + if e := pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", capturedImage, n); nil != e { + log.Println("pushEvent failed:>", e) + testStopChannel <- true + } + }() + } + } + } } func raceTask(seq int64) error { - var tk = time.NewTicker(time.Millisecond * 800) - defer func() { - tk.Stop() - }() - time.Sleep(time.Millisecond * 800) - go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false) - var started = false - for { - select { - case <-testStopChannel: - go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST END", true, random.Intn(40)+100) - return nil - case except := <-testExceptChannel: - if except.Type != 0 { - go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true) - } - case <-tk.C: - log.Println("===") - if !started { - go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true) - started = true - } - } - } + var tk = time.NewTicker(time.Millisecond * 800) + defer func() { + tk.Stop() + }() + time.Sleep(time.Millisecond * 800) + go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil) + var started = false + for { + select { + case <-testStopChannel: + go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST END", capturedImage, random.Intn(40)+100) + return nil + case except := <-testExceptChannel: + if except.Type != 0 { + go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage) + } + case <-tk.C: + log.Println("===") + if !started { + go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage) + started = true + } + } + } } -func pushException(url string, scene string, seq int64, tp int, desc string, img bool) { - var st = TestStat{ - Scene: scene, - Timestamp: time.Now().Unix(), - SequenceId: seq, - TestData: TestData{ - State: "EXCEPT", - ExceptionType: tp, - ExceptionDesc: desc, - }, - } - if img { - st.TestData.Image = IMAGES[random.Intn(len(IMAGES))] - } - httpPostEx(url, st, 3) +func encodeImage(img image.Image) string { + if nil == img { + return "" + } + log.Println("encodeImage:> Bounds:", img.Bounds()) + //_ = saveToFile(img) + buf := new(bytes.Buffer) + if e := jpeg.Encode(buf, img, &jpeg.Options{ + Quality: 60, + }); nil != e { + log.Println("encodeImage:> failed:", e) + } + log.Println("encodeImage:> image size:", buf.Len()) + s := base64.StdEncoding.EncodeToString(buf.Bytes()) + log.Println("encodeImage:> encoded len:", len(s)) + return s } -func pushEvent(url string, scene string, seq int64, state string, img bool, vals ...int) { - var st = TestStat{ - Scene: scene, - Timestamp: time.Now().Unix(), - SequenceId: seq, - TestData: TestData{ - State: state, - }, - } - if img { - st.TestData.Image = IMAGES1[random.Intn(len(IMAGES1))] - } - if len(vals) > 0 { - switch scene { - case "pullUp": - st.TestData.Counts = vals[0] - case "standJump": - st.TestData.DistanceCM = vals[0] - case "sitUps": - st.TestData.Counts = vals[0] - case "race": - st.TestData.DurationSec = float32(vals[0]) / 10.0 - default: +func pushException(url string, scene string, seq int64, tp int, desc string, img image.Image) { + var st = TestStat{ + Scene: scene, + Timestamp: time.Now().Unix(), + SequenceId: seq, + TestData: TestData{ + State: "EXCEPT", + ExceptionType: tp, + ExceptionDesc: desc, + }, + } + if nil != img { + st.TestData.Image = encodeImage(img) + } + httpPostEx(url, st, 3) +} + +func pushEvent(url string, scene string, seq int64, state string, img image.Image, vals ...int) error { + var st = TestStat{ + Scene: scene, + Timestamp: time.Now().Unix(), + SequenceId: seq, + TestData: TestData{ + State: state, + }, + } + if nil != img { + st.TestData.Image = encodeImage(img) + } + if len(vals) > 0 { + switch scene { + case "pullUp": + st.TestData.Counts = vals[0] + case "standJump": + st.TestData.DistanceCM = vals[0] + case "sitUps": + st.TestData.Counts = vals[0] + case "race": + st.TestData.DurationSec = float32(vals[0]) / 10.0 + default: - } - } - httpPostEx(url, st, 3) + } + } + return httpPostEx(url, st, 3) } func recogTaskProc(cmd *TestCommand) error { - testStopChannel = make(chan bool) - testExceptChannel = make(chan TestException) - random = rand.New(rand.NewSource(time.Now().UnixNano())) - if nil == currentScene { - return errors.New("invalid scene state") - } - if nil == currentScene.proc { - return errors.New("invalid scene mode processor") - } - currentTest = cmd - go func() { - defer func() { - close(testStopChannel) - close(testExceptChannel) - testStopChannel = nil - testExceptChannel = nil - currentTest = nil - }() - if e := currentScene.proc(cmd.SequenceId); nil != e { - log.Println(">>>>", e) - } - }() - go func() { - var min = testDurations[currentScene.Scene][0] - var max = testDurations[currentScene.Scene][1] - for i := 0; i < random.Intn(max-min)+min; i++ { - time.Sleep(time.Second) - } - testStopChannel <- true - }() - return nil + testStopChannel = make(chan bool) + testExceptChannel = make(chan TestException) + random = rand.New(rand.NewSource(time.Now().UnixNano())) + if nil == currentScene { + return errors.New("invalid scene state") + } + if nil == currentScene.proc { + return errors.New("invalid scene mode processor") + } + currentTest = cmd + go func() { + defer func() { + close(testStopChannel) + close(testExceptChannel) + testStopChannel = nil + testExceptChannel = nil + currentTest = nil + }() + if e := currentScene.proc(cmd.SequenceId); nil != e { + log.Println(">>>>", e) + } + }() + go func() { + var min = testDurations[currentScene.Scene][0] + var max = testDurations[currentScene.Scene][1] + for i := 0; i < random.Intn(max-min)+min; i++ { + time.Sleep(time.Second) + } + testStopChannel <- true + }() + return nil } func init() { - random = rand.New(rand.NewSource(time.Now().UnixNano())) - currentTest = nil - testExceptChannel = nil - testStopChannel = nil - testDurations = map[string][2]int{ - "pullUp": [2]int{30, 80}, - "standJump": [2]int{10, 30}, - "sitUps": [2]int{60, 70}, - "race": [2]int{9, 15}, - } + random = rand.New(rand.NewSource(time.Now().UnixNano())) + currentTest = nil + testExceptChannel = nil + testStopChannel = nil + testDurations = map[string][2]int{ + "pullUp": [2]int{30, 80}, + "standJump": [2]int{10, 30}, + "sitUps": [2]int{60, 70}, + "race": [2]int{9, 15}, + } } diff --git a/rtsp.go b/rtsp.go new file mode 100644 index 0000000..dc8dd0c --- /dev/null +++ b/rtsp.go @@ -0,0 +1,128 @@ +package main + +import ( + "errors" + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/url" + "image" + "image/jpeg" + "log" + "os" + "strconv" + "time" +) + +// This example shows how to +// 1. connect to a RTSP server and read all tracks on a path +// 2. check if there's a H264 track +// 3. decode H264 into RGBA frames +// 4. encode the frames into JPEG images and save them on disk + +// This example requires the ffmpeg libraries, that can be installed in this way: +// apt install -y libavformat-dev libswscale-dev gcc pkg-config + +func playRtsp(c *gortsplib.Client, u *url.URL, saveImage func(image.Image)) error { + if tracks, baseUrl, _, e := c.Describe(u); nil != e { + return e + } else { + h264TrackID, h264track := func() (int, *gortsplib.TrackH264) { + for i, track := range tracks { + if h264track, ok := track.(*gortsplib.TrackH264); ok { + return i, h264track + } + } + return -1, nil + }() + if h264TrackID < 0 { + return errors.New("h264 track not found") + } + // setup H264->raw frames decoder + h264dec, err := newH264Decoder() + if err != nil { + panic(err) + } + defer h264dec.close() + + // if present, send SPS and PPS from the SDP to the decoder + sps := h264track.SafeSPS() + if sps != nil { + h264dec.decode(sps) + } + pps := h264track.SafePPS() + if pps != nil { + h264dec.decode(pps) + } + + // called when a RTP packet arrives + if nil != saveImage { + log.Println("setupRtsp:> enabled saveImage") + c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) { + + if ctx.TrackID != h264TrackID { + //log.Println("x") + return + } + + if ctx.H264NALUs == nil { + //log.Println("y") + return + } + + for _, nalu := range ctx.H264NALUs { + // convert H264 NALUs to RGBA frames + img, err := h264dec.decode(nalu) + if err != nil { + panic(err) + } + + // wait for a frame + if img == nil { + //log.Println("z") + continue + } else { + //log.Println("...") + saveImage(img) + } + // convert frame to JPEG and save to file + + } + } + } + + // setup and read all tracks + err = c.SetupAndPlay(tracks, baseUrl) + if err != nil { + return err + } + // wait until a fatal error + return c.Wait() + } +} + +func setupRtsp(u string) (*gortsplib.Client, *url.URL, error) { + var c = &gortsplib.Client{} + if u, e := url.Parse(u); nil != e { + return nil, nil, e + } else if e := c.Start(u.Scheme, u.Host); nil != e { + return nil, nil, e + } else { + return c, u, nil + } +} + +func saveToFile(img image.Image) error { + // create file + fname := strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10) + ".jpg" + f, err := os.Create(fname) + if err != nil { + panic(err) + } + defer f.Close() + + log.Println("saving", fname) + + // convert to jpeg + return jpeg.Encode(f, img, &jpeg.Options{ + Quality: 60, + }) +} diff --git a/scene.go b/scene.go index 15d6e5c..d4ca533 100644 --- a/scene.go +++ b/scene.go @@ -1,181 +1,205 @@ package main import ( - "errors" - "github.com/gofiber/fiber/v2" - "log" - "net/url" - "time" + "errors" + "github.com/aler9/gortsplib" + "github.com/gofiber/fiber/v2" + "image" + "log" + "net/url" + "time" ) type SceneCommand struct { - Scene string `json:"scene"` - PushUrl string `json:"pushUrl"` - CameraUrls []string `json:"cameraURls"` - proc func(int64) error - areaConfigs []AreaConfig `json:"-"` + Scene string `json:"scene"` + PushUrl string `json:"pushUrl"` + CameraUrls []string `json:"cameraURls"` + proc func(int64) error + areaConfigs []AreaConfig `json:"-"` } type SceneData struct { - State string `json:"state"` - LiveUrl []string `json:"liveUrl"` + State string `json:"state"` + LiveUrl []string `json:"liveUrl"` } type Pointer struct { - Id string `json:"id"` - PercentX float32 `json:"percentX"` - PercentY float32 `json:"percentY"` + Id string `json:"id"` + PercentX float32 `json:"percentX"` + PercentY float32 `json:"percentY"` } type AreaConfig struct { - AreaId string `json:"areaId"` - PointersNum int `json:"pointersNum"` - Pointers []Pointer `json:"pointers"` + AreaId string `json:"areaId"` + PointersNum int `json:"pointersNum"` + Pointers []Pointer `json:"pointers"` } type SceneReadyResponse struct { - Scene string `json:"scene"` - Timestamp int64 `json:"timestamp"` - SceneData SceneData `json:"sceneData"` + Scene string `json:"scene"` + Timestamp int64 `json:"timestamp"` + SceneData SceneData `json:"sceneData"` } var currentScene *SceneCommand var sceneStopChannel chan bool func validateSceneCmd(cmd SceneCommand) error { - switch cmd.Scene { - case "pullUp", "standJump", "sitUps": - if len(cmd.CameraUrls) < 1 { - return errors.New("cameraUrls can not less than 1") - } - case "race": - if len(cmd.CameraUrls) < 2 { - return errors.New("cameraUrls can not less than 2") - } - default: - return errors.New("unknown scene type:" + cmd.Scene) - } - if u, e := url.Parse(cmd.PushUrl); nil != e { - return errors.New("invalid pushUrl:" + e.Error()) - } else if u.Scheme != "http" && u.Scheme != "https" { - return errors.New("can not support pushUrl with " + u.Scheme) - } - return nil + switch cmd.Scene { + case "pullUp", "standJump", "sitUps": + if len(cmd.CameraUrls) < 1 { + return errors.New("cameraUrls can not less than 1") + } + case "race": + if len(cmd.CameraUrls) < 2 { + return errors.New("cameraUrls can not less than 2") + } + default: + return errors.New("unknown scene type:" + cmd.Scene) + } + if u, e := url.Parse(cmd.PushUrl); nil != e { + return errors.New("invalid pushUrl:" + e.Error()) + } else if u.Scheme != "http" && u.Scheme != "https" { + return errors.New("can not support pushUrl with " + u.Scheme) + } + return nil } func setScene(c *fiber.Ctx) error { - var sceneCmd SceneCommand - if err := c.BodyParser(&sceneCmd); nil != err { - log.Println("ERROR:", c.Body()) - return failureResponse(c, "-1", err.Error(), 200) - } else if nil != currentScene { - return failureResponse(c, "1", "A working scene is started", 200) - } else if err = validateSceneCmd(sceneCmd); nil != err { - return failureResponse(c, "2", err.Error(), 200) - } else { - // currentScene = &sceneCmd - log.Println("Starting scene ...") - log.Println(sceneCmd) - if e := sceneTaskProc(&sceneCmd); nil != e { - return failureResponse(c, "3", err.Error(), 200) - } else { - return successResponse(c, "Updated successfully.") - } - } + var sceneCmd SceneCommand + if err := c.BodyParser(&sceneCmd); nil != err { + log.Println("ERROR:", c.Body()) + return failureResponse(c, "-1", err.Error(), 200) + } else if nil != currentScene { + return failureResponse(c, "1", "A working scene is started", 200) + } else if err = validateSceneCmd(sceneCmd); nil != err { + return failureResponse(c, "2", err.Error(), 200) + } else { + // currentScene = &sceneCmd + log.Println("Starting scene ...") + log.Println(sceneCmd) + if e := sceneTaskProc(&sceneCmd); nil != e { + return failureResponse(c, "3", err.Error(), 200) + } else { + return successResponse(c, "Updated successfully.") + } + } } func setAreaConfig(c *fiber.Ctx) error { - var cfg []AreaConfig - if e := c.BodyParser(&cfg); nil != e { - log.Println("ERROR:", c.Body()) - return failureResponse(c, "-1", e.Error(), 200) - } else if nil == currentScene { - return failureResponse(c, "1", "No working scene is started", 200) - } - currentScene.areaConfigs = cfg - return successResponse(c, "Updated successfully.") + var cfg []AreaConfig + if e := c.BodyParser(&cfg); nil != e { + log.Println("ERROR:", c.Body()) + return failureResponse(c, "-1", e.Error(), 200) + } else if nil == currentScene { + return failureResponse(c, "1", "No working scene is started", 200) + } + currentScene.areaConfigs = cfg + return successResponse(c, "Updated successfully.") } func getAreaConfig(c *fiber.Ctx) error { - if nil == currentScene { - return failureResponse(c, "1", "No working scene is started", 200) - } else { - return successResponseData(c, "success", currentScene.areaConfigs) - } + if nil == currentScene { + return failureResponse(c, "1", "No working scene is started", 200) + } else { + return successResponseData(c, "success", currentScene.areaConfigs) + } } func stopScene(c *fiber.Ctx) error { - if nil != testStopChannel { - testStopChannel <- true - } - if nil != sceneStopChannel { - sceneStopChannel <- true - } - return successResponse(c, "Updated successfully.") + if nil != testStopChannel { + testStopChannel <- true + } + if nil != sceneStopChannel { + sceneStopChannel <- true + } + return successResponse(c, "Updated successfully.") } func getStatus(c *fiber.Ctx) error { - var scene = "" - var seq int64 = 0 - if nil != currentScene { - scene = currentScene.Scene - } - if nil != currentTest { - seq = currentTest.SequenceId - } - return successResponseEx(c, "success", scene, seq) + var scene = "" + var seq int64 = 0 + if nil != currentScene { + scene = currentScene.Scene + } + if nil != currentTest { + seq = currentTest.SequenceId + } + return successResponseEx(c, "success", scene, seq) } +var capturedImage image.Image + func sceneTaskProc(cmd *SceneCommand) error { - var f func(int64) error = nil - switch cmd.Scene { - case "pullUp": - f = pullUpTask - case "standJump": - f = standJumpTask - case "sitUps": - f = sitUpsTask - case "race": - f = raceTask - default: - return errors.New("unknown scene type:" + cmd.Scene) - } - sceneStopChannel = make(chan bool) - var tk = time.NewTicker(time.Second * 3) - go func() { - defer func() { - tk.Stop() - close(sceneStopChannel) - sceneStopChannel = nil - }() - currentScene = cmd - currentScene.proc = f - time.Sleep(time.Second) - var st = SceneReadyResponse{ - Scene: cmd.Scene, - Timestamp: time.Now().Unix(), - SceneData: SceneData{ - State: "SCENE READY", - LiveUrl: cmd.CameraUrls, - }, - } - go httpPostEx(cmd.PushUrl, st, 3) - for { - select { - case <-tk.C: - // log.Println(" ...") - case <-sceneStopChannel: - log.Println("Scene Stopping...") - go pushEvent(cmd.PushUrl, cmd.Scene, 0, "SCENE END", false) - currentScene = nil - return - } - } - }() - return nil + var f func(int64) error = nil + switch cmd.Scene { + case "pullUp": + f = pullUpTask + case "standJump": + f = standJumpTask + case "sitUps": + f = sitUpsTask + case "race": + f = raceTask + default: + return errors.New("unknown scene type:" + cmd.Scene) + } + if len(cmd.CameraUrls) < 1 { + return errors.New("no camera specified") + } + var cc *gortsplib.Client + if c, u, e := setupRtsp(cmd.CameraUrls[0]); nil != e { + log.Println("setupRtsp failed:", e) + } else { + cc = c + go func() { + if e := playRtsp(c, u, func(img image.Image) { + //log.Println("...capturedImage:>", img.Bounds()) + capturedImage = img + }); nil != e { + //log.Println("playRtsp failed:>", e) + } + }() + } + sceneStopChannel = make(chan bool) + var tk = time.NewTicker(time.Second * 3) + go func() { + defer func() { + tk.Stop() + close(sceneStopChannel) + if nil != cc { + _ = cc.Close() + } + sceneStopChannel = nil + }() + currentScene = cmd + currentScene.proc = f + time.Sleep(time.Second) + var st = SceneReadyResponse{ + Scene: cmd.Scene, + Timestamp: time.Now().Unix(), + SceneData: SceneData{ + State: "SCENE READY", + LiveUrl: cmd.CameraUrls, + }, + } + go httpPostEx(cmd.PushUrl, st, 3) + for { + select { + case <-tk.C: + // log.Println(" ...") + case <-sceneStopChannel: + log.Println("Scene Stopping...") + go pushEvent(cmd.PushUrl, cmd.Scene, 0, "SCENE END", nil) + currentScene = nil + return + } + } + }() + return nil } func init() { - currentScene = nil - sceneStopChannel = nil + currentScene = nil + sceneStopChannel = nil }