Browse Source

Enabled rtsp capture.

develop
Mingcai SHEN 2 years ago
parent
commit
527cf35bba
9 changed files with 793 additions and 451 deletions
  1. +14
    -0
      Makefile
  2. +5
    -2
      go.mod
  3. +0
    -1
      go.sum
  4. +140
    -0
      h264decoder.go
  5. +33
    -31
      http.go
  6. +37
    -37
      main.go
  7. +277
    -245
      recog.go
  8. +128
    -0
      rtsp.go
  9. +159
    -135
      scene.go

+ 14
- 0
Makefile View File

@ -0,0 +1,14 @@
GOSRCS := $(shell find . -name "*.go")
all: sport_rec_demo
clean:
@rm -f sport_rec_demo
run: sport_rec_demo
@echo "Starting ..."
@./sport_rec_demo serve -L :8080
sport_rec_demo: $(GOSRCS) Makefile
@echo "Building $@ ..."
@go build $@

+ 5
- 2
go.mod View File

@ -2,17 +2,20 @@ module sport_rec_demo
go 1.18
require (
github.com/gofiber/fiber/v2 v2.30.0
github.com/spf13/cobra v1.4.0
)
require (
github.com/aler9/gortsplib v0.0.0-20220829125132-e99f799c07e7 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/gofiber/fiber/v2 v2.30.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.15.0 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.9 // indirect
github.com/pion/rtp v1.7.13 // indirect
github.com/pion/sdp/v3 v3.0.5 // indirect
github.com/spf13/cobra v1.4.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.34.0 // indirect

+ 0
- 1
go.sum View File

@ -42,7 +42,6 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

+ 140
- 0
h264decoder.go View File

@ -0,0 +1,140 @@
package main
import (
"fmt"
"image"
"unsafe"
)
// #cgo pkg-config: libavcodec libavutil libswscale
// #include <libavcodec/avcodec.h>
// #include <libavutil/imgutils.h>
// #include <libswscale/swscale.h>
import "C"
func frameData(frame *C.AVFrame) **C.uint8_t {
return (**C.uint8_t)(unsafe.Pointer(&frame.data[0]))
}
func frameLineSize(frame *C.AVFrame) *C.int {
return (*C.int)(unsafe.Pointer(&frame.linesize[0]))
}
// h264Decoder is a wrapper around ffmpeg's H264 decoder.
type h264Decoder struct {
codecCtx *C.AVCodecContext
srcFrame *C.AVFrame
swsCtx *C.struct_SwsContext
dstFrame *C.AVFrame
dstFramePtr []uint8
}
// newH264Decoder allocates a new h264Decoder.
func newH264Decoder() (*h264Decoder, error) {
codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H264)
if codec == nil {
return nil, fmt.Errorf("avcodec_find_decoder() failed")
}
codecCtx := C.avcodec_alloc_context3(codec)
if codecCtx == nil {
return nil, fmt.Errorf("avcodec_alloc_context3() failed")
}
res := C.avcodec_open2(codecCtx, codec, nil)
if res < 0 {
C.avcodec_close(codecCtx)
return nil, fmt.Errorf("avcodec_open2() failed")
}
srcFrame := C.av_frame_alloc()
if srcFrame == nil {
C.avcodec_close(codecCtx)
return nil, fmt.Errorf("av_frame_alloc() failed")
}
return &h264Decoder{
codecCtx: codecCtx,
srcFrame: srcFrame,
}, nil
}
// close closes the decoder.
func (d *h264Decoder) close() {
if d.dstFrame != nil {
C.av_frame_free(&d.dstFrame)
}
if d.swsCtx != nil {
C.sws_freeContext(d.swsCtx)
}
C.av_frame_free(&d.srcFrame)
C.avcodec_close(d.codecCtx)
}
func (d *h264Decoder) decode(nalu []byte) (image.Image, error) {
nalu = append([]uint8{0x00, 0x00, 0x00, 0x01}, []uint8(nalu)...)
// send frame to decoder
var avPacket C.AVPacket
avPacket.data = (*C.uint8_t)(C.CBytes(nalu))
defer C.free(unsafe.Pointer(avPacket.data))
avPacket.size = C.int(len(nalu))
res := C.avcodec_send_packet(d.codecCtx, &avPacket)
if res < 0 {
return nil, nil
}
// receive frame if available
res = C.avcodec_receive_frame(d.codecCtx, d.srcFrame)
if res < 0 {
return nil, nil
}
// if frame size has changed, allocate needed objects
if d.dstFrame == nil || d.dstFrame.width != d.srcFrame.width || d.dstFrame.height != d.srcFrame.height {
if d.dstFrame != nil {
C.av_frame_free(&d.dstFrame)
}
if d.swsCtx != nil {
C.sws_freeContext(d.swsCtx)
}
d.dstFrame = C.av_frame_alloc()
d.dstFrame.format = C.AV_PIX_FMT_RGBA
d.dstFrame.width = d.srcFrame.width
d.dstFrame.height = d.srcFrame.height
d.dstFrame.color_range = C.AVCOL_RANGE_JPEG
res = C.av_frame_get_buffer(d.dstFrame, 1)
if res < 0 {
return nil, fmt.Errorf("av_frame_get_buffer() err")
}
d.swsCtx = C.sws_getContext(d.srcFrame.width, d.srcFrame.height, C.AV_PIX_FMT_YUV420P,
d.dstFrame.width, d.dstFrame.height, (int32)(d.dstFrame.format), C.SWS_BILINEAR, nil, nil, nil)
if d.swsCtx == nil {
return nil, fmt.Errorf("sws_getContext() err")
}
dstFrameSize := C.av_image_get_buffer_size((int32)(d.dstFrame.format), d.dstFrame.width, d.dstFrame.height, 1)
d.dstFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.dstFrame.data[0]))[:dstFrameSize:dstFrameSize]
}
// convert frame from YUV420 to RGB
res = C.sws_scale(d.swsCtx, frameData(d.srcFrame), frameLineSize(d.srcFrame),
0, d.srcFrame.height, frameData(d.dstFrame), frameLineSize(d.dstFrame))
if res < 0 {
return nil, fmt.Errorf("sws_scale() err")
}
// embed frame into an image.Image
return &image.RGBA{
Pix: d.dstFramePtr,
Stride: 4 * (int)(d.dstFrame.width),
Rect: image.Rectangle{
Max: image.Point{(int)(d.dstFrame.width), (int)(d.dstFrame.height)},
},
}, nil
}

+ 33
- 31
http.go View File

@ -1,49 +1,51 @@
package main
import (
"bytes"
"encoding/json"
"errors"
"log"
"net/http"
"time"
"bytes"
"encoding/json"
"errors"
"log"
"net/http"
"time"
)
var httpClient *http.Client
var fakeClient = false
func httpPost(url string, data interface{}) error {
log.Println("httpPost:url>", url)
if fakeClient {
log.Println("httpPost:data> fake client ")
return nil
}
if bs, e := json.Marshal(data); nil != e {
return e
} else if resp, e := httpClient.Post(url, "application/json", bytes.NewBuffer(bs)); nil != e {
return e
} else if resp.StatusCode != http.StatusOK {
return errors.New("invalid resp status: " + resp.Status)
} else {
return nil
}
log.Println("httpPost:url>", url)
if fakeClient {
log.Println("httpPost:data> fake client ")
return nil
}
if bs, e := json.Marshal(data); nil != e {
return e
} else if resp, e := httpClient.Post(url, "application/json", bytes.NewBuffer(bs)); nil != e {
return e
} else if resp.StatusCode != http.StatusOK {
return errors.New("invalid resp status: " + resp.Status)
} else {
return nil
}
}
func httpGet(url string, resp interface{}) error {
return nil
return nil
}
func httpPostEx(url string, data interface{}, retries int) {
for i := 0; i < retries; i++ {
if e := httpPost(url, data); nil != e {
log.Println(">>>", e)
time.Sleep(3 * time.Second)
} else {
return
}
}
func httpPostEx(url string, data interface{}, retries int) error {
var err error
for i := 0; i < retries; i++ {
if err = httpPost(url, data); nil != err {
log.Println(">>>", err)
time.Sleep(3 * time.Second)
} else {
return nil
}
}
return err
}
func init() {
httpClient = http.DefaultClient
httpClient = http.DefaultClient
}

+ 37
- 37
main.go View File

@ -1,26 +1,26 @@
package main
import (
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/spf13/cobra"
"log"
"os"
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/spf13/cobra"
"log"
"os"
)
var listenAddr = ":3000"
var rootCmd = &cobra.Command{
Use: "sport_rec",
Short: "Sport Recognition Demo",
Long: "Sport Recognition Demo",
Use: "sport_rec",
Short: "Sport Recognition Demo",
Long: "Sport Recognition Demo",
}
func init() {
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVarP(&listenAddr, "listen", "L", ":3000", "Service listen address")
rootCmd.PersistentFlags().BoolVarP(&fakeClient, "fake", "F", false, "Fake HTP client instead of pushing")
rootCmd.AddCommand(serveCmd)
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVarP(&listenAddr, "listen", "L", ":3000", "Service listen address")
rootCmd.PersistentFlags().BoolVarP(&fakeClient, "fake", "F", false, "Fake HTP client instead of pushing")
rootCmd.AddCommand(serveCmd)
}
func initConfig() {
@ -28,32 +28,32 @@ func initConfig() {
}
var serveCmd = &cobra.Command{
Use: "run",
Short: "Run service",
Run: func(cmd *cobra.Command, args []string) {
app := fiber.New()
app.Get("/", func(c *fiber.Ctx) error {
return c.SendString("Hello, World!")
})
app.Post("/api/v1/setScene", setScene)
app.Post("/api/v1/setTestCmd", setTestCmd)
// app.Post("/api/v1/eventHandle")
app.Post("/api/v1/areaConfig", setAreaConfig)
app.Get("/api/v1/areaConfig", getAreaConfig)
app.Post("/api/v1/stopScene", stopScene)
app.Get("/api/v1/stopScene", stopScene)
app.Get("/api/v1/getStatus", getStatus)
app.Get("/api/v1/getTestStatus", getStatus)
log.Fatal(app.Listen(listenAddr))
},
Use: "serve",
Short: "Run service",
Run: func(cmd *cobra.Command, args []string) {
app := fiber.New()
app.Get("/", func(c *fiber.Ctx) error {
return c.SendString("Hello, World!")
})
app.Post("/api/v1/setScene", setScene)
app.Post("/api/v1/setTestCmd", setTestCmd)
// app.Post("/api/v1/eventHandle")
app.Post("/api/v1/areaConfig", setAreaConfig)
app.Get("/api/v1/areaConfig", getAreaConfig)
app.Post("/api/v1/stopScene", stopScene)
app.Get("/api/v1/stopScene", stopScene)
app.Get("/api/v1/getStatus", getStatus)
app.Get("/api/v1/getTestStatus", getStatus)
log.Fatal(app.Listen(listenAddr))
},
}
func main() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

+ 277
- 245
recog.go View File

@ -1,38 +1,42 @@
package main
import (
"errors"
"github.com/gofiber/fiber/v2"
"log"
"math/rand"
"time"
"bytes"
"encoding/base64"
"errors"
"github.com/gofiber/fiber/v2"
"image"
"image/jpeg"
"log"
"math/rand"
"time"
)
type TestCommand struct {
Command string `json:"cmd"`
SequenceId int64 `json:"sequenceId"`
Command string `json:"cmd"`
SequenceId int64 `json:"sequenceId"`
}
type TestData struct {
State string `json:"state"`
ExceptionType int `json:"exceptionType,omitempty"`
ExceptionDesc string `json:"exceptionDesc,omitempty"`
Image string `json:"image,omitempty"`
DistanceCM int `json:"distanceCM,omitempty"`
Counts int `json:"counts,omitempty"`
DurationSec float32 `json:"durationSec,omitempty"`
State string `json:"state"`
ExceptionType int `json:"exceptionType,omitempty"`
ExceptionDesc string `json:"exceptionDesc,omitempty"`
Image string `json:"image,omitempty"`
DistanceCM int `json:"distanceCM,omitempty"`
Counts int `json:"counts,omitempty"`
DurationSec float32 `json:"durationSec,omitempty"`
}
type TestStat struct {
Scene string `json:"scene"`
Timestamp int64 `json:"timestamp"`
SequenceId int64 `json:"sequenceId"`
TestData TestData `json:"sceneData"`
Scene string `json:"scene"`
Timestamp int64 `json:"timestamp"`
SequenceId int64 `json:"sequenceId"`
TestData TestData `json:"sceneData"`
}
type TestException struct {
Type int
Desc string
Type int
Desc string
}
var currentTest *TestCommand
@ -43,253 +47,281 @@ var testStopChannel chan bool
var testDurations map[string][2]int
func setTestCmd(c *fiber.Ctx) error {
var testCmd TestCommand
if err := c.BodyParser(&testCmd); nil != err {
log.Println("ERROR:", c.Body())
return failureResponse(c, "-1", err.Error(), 200)
} else if nil == currentScene {
return failureResponse(c, "1", "not in a working scene", 200)
} else {
switch testCmd.Command {
case "START":
log.Println("START ...")
if nil != currentTest {
return failureResponse(c, "2", "A test already started", 200)
}
if e := recogTaskProc(&testCmd); nil != e {
return failureResponse(c, "3", e.Error(), 200)
}
case "STOP":
if nil != testStopChannel {
testStopChannel <- true
log.Println("STOP ...")
}
var testCmd TestCommand
if err := c.BodyParser(&testCmd); nil != err {
log.Println("ERROR:", c.Body())
return failureResponse(c, "-1", err.Error(), 200)
} else if nil == currentScene {
return failureResponse(c, "1", "not in a working scene", 200)
} else {
switch testCmd.Command {
case "START":
log.Println("START ...")
if nil != currentTest {
return failureResponse(c, "2", "A test already started", 200)
}
if e := recogTaskProc(&testCmd); nil != e {
return failureResponse(c, "3", e.Error(), 200)
}
case "STOP":
if nil != testStopChannel {
testStopChannel <- true
log.Println("STOP ...")
}
case "CANCEL":
if nil != testStopChannel {
testStopChannel <- true
log.Println("CANCEL ...")
}
default:
log.Println("UNKNOWN ...")
}
return successResponse(c, "Updated successfully.")
}
case "CANCEL":
if nil != testStopChannel {
testStopChannel <- true
log.Println("CANCEL ...")
}
default:
log.Println("UNKNOWN ...")
}
return successResponse(c, "Updated successfully.")
}
}
func pullUpTask(seq int64) error {
var tk = time.NewTicker(time.Millisecond * time.Duration(1500+random.Intn(800)))
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 1300)
total := random.Intn(10)
n := 0
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n)
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true, n)
started = true
} else if n >= total {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n)
return nil
} else {
n += 1
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", true, n)
}
}
}
var tk = time.NewTicker(time.Millisecond * time.Duration(1500+random.Intn(800)))
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 1300)
total := random.Intn(10)
n := 0
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n)
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage, n)
started = true
} else if n >= total {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n)
return nil
} else {
n += 1
go func() {
if e := pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", capturedImage, n); nil != e {
log.Println("pushEvent failed:>", e)
testStopChannel <- true
}
}()
}
}
}
}
func standJumpTask(seq int64) error {
var tk = time.NewTicker(time.Millisecond * 400)
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 500)
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, 1000+random.Intn(1000))
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true)
started = true
}
}
}
var tk = time.NewTicker(time.Millisecond * 400)
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 500)
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, 1000+random.Intn(1000))
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage)
started = true
}
}
}
}
func sitUpsTask(seq int64) error {
var tk = time.NewTicker(time.Millisecond * 500)
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 600)
total := 40 + random.Intn(20)
n := 0
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n)
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true, n)
started = true
} else if n >= total {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", true, n)
return nil
} else {
n += 1
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", true, n)
}
}
}
var tk = time.NewTicker(time.Millisecond * 500)
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 600)
total := 40 + random.Intn(20)
n := 0
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n)
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage, n)
started = true
} else if n >= total {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "TEST END", capturedImage, n)
return nil
} else {
n += 1
go func() {
if e := pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "COUNT", capturedImage, n); nil != e {
log.Println("pushEvent failed:>", e)
testStopChannel <- true
}
}()
}
}
}
}
func raceTask(seq int64) error {
var tk = time.NewTicker(time.Millisecond * 800)
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 800)
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", false)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST END", true, random.Intn(40)+100)
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, true)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", true)
started = true
}
}
}
var tk = time.NewTicker(time.Millisecond * 800)
defer func() {
tk.Stop()
}()
time.Sleep(time.Millisecond * 800)
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST READY", nil)
var started = false
for {
select {
case <-testStopChannel:
go pushEvent(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, "TEST END", capturedImage, random.Intn(40)+100)
return nil
case except := <-testExceptChannel:
if except.Type != 0 {
go pushException(currentScene.PushUrl, currentScene.Scene, currentTest.SequenceId, except.Type, except.Desc, capturedImage)
}
case <-tk.C:
log.Println("===")
if !started {
go pushEvent(currentScene.PushUrl, currentScene.Scene, seq, "START", capturedImage)
started = true
}
}
}
}
func pushException(url string, scene string, seq int64, tp int, desc string, img bool) {
var st = TestStat{
Scene: scene,
Timestamp: time.Now().Unix(),
SequenceId: seq,
TestData: TestData{
State: "EXCEPT",
ExceptionType: tp,
ExceptionDesc: desc,
},
}
if img {
st.TestData.Image = IMAGES[random.Intn(len(IMAGES))]
}
httpPostEx(url, st, 3)
func encodeImage(img image.Image) string {
if nil == img {
return ""
}
log.Println("encodeImage:> Bounds:", img.Bounds())
//_ = saveToFile(img)
buf := new(bytes.Buffer)
if e := jpeg.Encode(buf, img, &jpeg.Options{
Quality: 60,
}); nil != e {
log.Println("encodeImage:> failed:", e)
}
log.Println("encodeImage:> image size:", buf.Len())
s := base64.StdEncoding.EncodeToString(buf.Bytes())
log.Println("encodeImage:> encoded len:", len(s))
return s
}
func pushEvent(url string, scene string, seq int64, state string, img bool, vals ...int) {
var st = TestStat{
Scene: scene,
Timestamp: time.Now().Unix(),
SequenceId: seq,
TestData: TestData{
State: state,
},
}
if img {
st.TestData.Image = IMAGES1[random.Intn(len(IMAGES1))]
}
if len(vals) > 0 {
switch scene {
case "pullUp":
st.TestData.Counts = vals[0]
case "standJump":
st.TestData.DistanceCM = vals[0]
case "sitUps":
st.TestData.Counts = vals[0]
case "race":
st.TestData.DurationSec = float32(vals[0]) / 10.0
default:
func pushException(url string, scene string, seq int64, tp int, desc string, img image.Image) {
var st = TestStat{
Scene: scene,
Timestamp: time.Now().Unix(),
SequenceId: seq,
TestData: TestData{
State: "EXCEPT",
ExceptionType: tp,
ExceptionDesc: desc,
},
}
if nil != img {
st.TestData.Image = encodeImage(img)
}
httpPostEx(url, st, 3)
}
func pushEvent(url string, scene string, seq int64, state string, img image.Image, vals ...int) error {
var st = TestStat{
Scene: scene,
Timestamp: time.Now().Unix(),
SequenceId: seq,
TestData: TestData{
State: state,
},
}
if nil != img {
st.TestData.Image = encodeImage(img)
}
if len(vals) > 0 {
switch scene {
case "pullUp":
st.TestData.Counts = vals[0]
case "standJump":
st.TestData.DistanceCM = vals[0]
case "sitUps":
st.TestData.Counts = vals[0]
case "race":
st.TestData.DurationSec = float32(vals[0]) / 10.0
default:
}
}
httpPostEx(url, st, 3)
}
}
return httpPostEx(url, st, 3)
}
func recogTaskProc(cmd *TestCommand) error {
testStopChannel = make(chan bool)
testExceptChannel = make(chan TestException)
random = rand.New(rand.NewSource(time.Now().UnixNano()))
if nil == currentScene {
return errors.New("invalid scene state")
}
if nil == currentScene.proc {
return errors.New("invalid scene mode processor")
}
currentTest = cmd
go func() {
defer func() {
close(testStopChannel)
close(testExceptChannel)
testStopChannel = nil
testExceptChannel = nil
currentTest = nil
}()
if e := currentScene.proc(cmd.SequenceId); nil != e {
log.Println(">>>>", e)
}
}()
go func() {
var min = testDurations[currentScene.Scene][0]
var max = testDurations[currentScene.Scene][1]
for i := 0; i < random.Intn(max-min)+min; i++ {
time.Sleep(time.Second)
}
testStopChannel <- true
}()
return nil
testStopChannel = make(chan bool)
testExceptChannel = make(chan TestException)
random = rand.New(rand.NewSource(time.Now().UnixNano()))
if nil == currentScene {
return errors.New("invalid scene state")
}
if nil == currentScene.proc {
return errors.New("invalid scene mode processor")
}
currentTest = cmd
go func() {
defer func() {
close(testStopChannel)
close(testExceptChannel)
testStopChannel = nil
testExceptChannel = nil
currentTest = nil
}()
if e := currentScene.proc(cmd.SequenceId); nil != e {
log.Println(">>>>", e)
}
}()
go func() {
var min = testDurations[currentScene.Scene][0]
var max = testDurations[currentScene.Scene][1]
for i := 0; i < random.Intn(max-min)+min; i++ {
time.Sleep(time.Second)
}
testStopChannel <- true
}()
return nil
}
func init() {
random = rand.New(rand.NewSource(time.Now().UnixNano()))
currentTest = nil
testExceptChannel = nil
testStopChannel = nil
testDurations = map[string][2]int{
"pullUp": [2]int{30, 80},
"standJump": [2]int{10, 30},
"sitUps": [2]int{60, 70},
"race": [2]int{9, 15},
}
random = rand.New(rand.NewSource(time.Now().UnixNano()))
currentTest = nil
testExceptChannel = nil
testStopChannel = nil
testDurations = map[string][2]int{
"pullUp": [2]int{30, 80},
"standJump": [2]int{10, 30},
"sitUps": [2]int{60, 70},
"race": [2]int{9, 15},
}
}

+ 128
- 0
rtsp.go View File

@ -0,0 +1,128 @@
package main
import (
"errors"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/url"
"image"
"image/jpeg"
"log"
"os"
"strconv"
"time"
)
// This example shows how to
// 1. connect to a RTSP server and read all tracks on a path
// 2. check if there's a H264 track
// 3. decode H264 into RGBA frames
// 4. encode the frames into JPEG images and save them on disk
// This example requires the ffmpeg libraries, that can be installed in this way:
// apt install -y libavformat-dev libswscale-dev gcc pkg-config
func playRtsp(c *gortsplib.Client, u *url.URL, saveImage func(image.Image)) error {
if tracks, baseUrl, _, e := c.Describe(u); nil != e {
return e
} else {
h264TrackID, h264track := func() (int, *gortsplib.TrackH264) {
for i, track := range tracks {
if h264track, ok := track.(*gortsplib.TrackH264); ok {
return i, h264track
}
}
return -1, nil
}()
if h264TrackID < 0 {
return errors.New("h264 track not found")
}
// setup H264->raw frames decoder
h264dec, err := newH264Decoder()
if err != nil {
panic(err)
}
defer h264dec.close()
// if present, send SPS and PPS from the SDP to the decoder
sps := h264track.SafeSPS()
if sps != nil {
h264dec.decode(sps)
}
pps := h264track.SafePPS()
if pps != nil {
h264dec.decode(pps)
}
// called when a RTP packet arrives
if nil != saveImage {
log.Println("setupRtsp:> enabled saveImage")
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
if ctx.TrackID != h264TrackID {
//log.Println("x")
return
}
if ctx.H264NALUs == nil {
//log.Println("y")
return
}
for _, nalu := range ctx.H264NALUs {
// convert H264 NALUs to RGBA frames
img, err := h264dec.decode(nalu)
if err != nil {
panic(err)
}
// wait for a frame
if img == nil {
//log.Println("z")
continue
} else {
//log.Println("...")
saveImage(img)
}
// convert frame to JPEG and save to file
}
}
}
// setup and read all tracks
err = c.SetupAndPlay(tracks, baseUrl)
if err != nil {
return err
}
// wait until a fatal error
return c.Wait()
}
}
func setupRtsp(u string) (*gortsplib.Client, *url.URL, error) {
var c = &gortsplib.Client{}
if u, e := url.Parse(u); nil != e {
return nil, nil, e
} else if e := c.Start(u.Scheme, u.Host); nil != e {
return nil, nil, e
} else {
return c, u, nil
}
}
func saveToFile(img image.Image) error {
// create file
fname := strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10) + ".jpg"
f, err := os.Create(fname)
if err != nil {
panic(err)
}
defer f.Close()
log.Println("saving", fname)
// convert to jpeg
return jpeg.Encode(f, img, &jpeg.Options{
Quality: 60,
})
}

+ 159
- 135
scene.go View File

@ -1,181 +1,205 @@
package main
import (
"errors"
"github.com/gofiber/fiber/v2"
"log"
"net/url"
"time"
"errors"
"github.com/aler9/gortsplib"
"github.com/gofiber/fiber/v2"
"image"
"log"
"net/url"
"time"
)
type SceneCommand struct {
Scene string `json:"scene"`
PushUrl string `json:"pushUrl"`
CameraUrls []string `json:"cameraURls"`
proc func(int64) error
areaConfigs []AreaConfig `json:"-"`
Scene string `json:"scene"`
PushUrl string `json:"pushUrl"`
CameraUrls []string `json:"cameraURls"`
proc func(int64) error
areaConfigs []AreaConfig `json:"-"`
}
type SceneData struct {
State string `json:"state"`
LiveUrl []string `json:"liveUrl"`
State string `json:"state"`
LiveUrl []string `json:"liveUrl"`
}
type Pointer struct {
Id string `json:"id"`
PercentX float32 `json:"percentX"`
PercentY float32 `json:"percentY"`
Id string `json:"id"`
PercentX float32 `json:"percentX"`
PercentY float32 `json:"percentY"`
}
type AreaConfig struct {
AreaId string `json:"areaId"`
PointersNum int `json:"pointersNum"`
Pointers []Pointer `json:"pointers"`
AreaId string `json:"areaId"`
PointersNum int `json:"pointersNum"`
Pointers []Pointer `json:"pointers"`
}
type SceneReadyResponse struct {
Scene string `json:"scene"`
Timestamp int64 `json:"timestamp"`
SceneData SceneData `json:"sceneData"`
Scene string `json:"scene"`
Timestamp int64 `json:"timestamp"`
SceneData SceneData `json:"sceneData"`
}
var currentScene *SceneCommand
var sceneStopChannel chan bool
func validateSceneCmd(cmd SceneCommand) error {
switch cmd.Scene {
case "pullUp", "standJump", "sitUps":
if len(cmd.CameraUrls) < 1 {
return errors.New("cameraUrls can not less than 1")
}
case "race":
if len(cmd.CameraUrls) < 2 {
return errors.New("cameraUrls can not less than 2")
}
default:
return errors.New("unknown scene type:" + cmd.Scene)
}
if u, e := url.Parse(cmd.PushUrl); nil != e {
return errors.New("invalid pushUrl:" + e.Error())
} else if u.Scheme != "http" && u.Scheme != "https" {
return errors.New("can not support pushUrl with " + u.Scheme)
}
return nil
switch cmd.Scene {
case "pullUp", "standJump", "sitUps":
if len(cmd.CameraUrls) < 1 {
return errors.New("cameraUrls can not less than 1")
}
case "race":
if len(cmd.CameraUrls) < 2 {
return errors.New("cameraUrls can not less than 2")
}
default:
return errors.New("unknown scene type:" + cmd.Scene)
}
if u, e := url.Parse(cmd.PushUrl); nil != e {
return errors.New("invalid pushUrl:" + e.Error())
} else if u.Scheme != "http" && u.Scheme != "https" {
return errors.New("can not support pushUrl with " + u.Scheme)
}
return nil
}
func setScene(c *fiber.Ctx) error {
var sceneCmd SceneCommand
if err := c.BodyParser(&sceneCmd); nil != err {
log.Println("ERROR:", c.Body())
return failureResponse(c, "-1", err.Error(), 200)
} else if nil != currentScene {
return failureResponse(c, "1", "A working scene is started", 200)
} else if err = validateSceneCmd(sceneCmd); nil != err {
return failureResponse(c, "2", err.Error(), 200)
} else {
// currentScene = &sceneCmd
log.Println("Starting scene ...")
log.Println(sceneCmd)
if e := sceneTaskProc(&sceneCmd); nil != e {
return failureResponse(c, "3", err.Error(), 200)
} else {
return successResponse(c, "Updated successfully.")
}
}
var sceneCmd SceneCommand
if err := c.BodyParser(&sceneCmd); nil != err {
log.Println("ERROR:", c.Body())
return failureResponse(c, "-1", err.Error(), 200)
} else if nil != currentScene {
return failureResponse(c, "1", "A working scene is started", 200)
} else if err = validateSceneCmd(sceneCmd); nil != err {
return failureResponse(c, "2", err.Error(), 200)
} else {
// currentScene = &sceneCmd
log.Println("Starting scene ...")
log.Println(sceneCmd)
if e := sceneTaskProc(&sceneCmd); nil != e {
return failureResponse(c, "3", err.Error(), 200)
} else {
return successResponse(c, "Updated successfully.")
}
}
}
func setAreaConfig(c *fiber.Ctx) error {
var cfg []AreaConfig
if e := c.BodyParser(&cfg); nil != e {
log.Println("ERROR:", c.Body())
return failureResponse(c, "-1", e.Error(), 200)
} else if nil == currentScene {
return failureResponse(c, "1", "No working scene is started", 200)
}
currentScene.areaConfigs = cfg
return successResponse(c, "Updated successfully.")
var cfg []AreaConfig
if e := c.BodyParser(&cfg); nil != e {
log.Println("ERROR:", c.Body())
return failureResponse(c, "-1", e.Error(), 200)
} else if nil == currentScene {
return failureResponse(c, "1", "No working scene is started", 200)
}
currentScene.areaConfigs = cfg
return successResponse(c, "Updated successfully.")
}
func getAreaConfig(c *fiber.Ctx) error {
if nil == currentScene {
return failureResponse(c, "1", "No working scene is started", 200)
} else {
return successResponseData(c, "success", currentScene.areaConfigs)
}
if nil == currentScene {
return failureResponse(c, "1", "No working scene is started", 200)
} else {
return successResponseData(c, "success", currentScene.areaConfigs)
}
}
func stopScene(c *fiber.Ctx) error {
if nil != testStopChannel {
testStopChannel <- true
}
if nil != sceneStopChannel {
sceneStopChannel <- true
}
return successResponse(c, "Updated successfully.")
if nil != testStopChannel {
testStopChannel <- true
}
if nil != sceneStopChannel {
sceneStopChannel <- true
}
return successResponse(c, "Updated successfully.")
}
func getStatus(c *fiber.Ctx) error {
var scene = ""
var seq int64 = 0
if nil != currentScene {
scene = currentScene.Scene
}
if nil != currentTest {
seq = currentTest.SequenceId
}
return successResponseEx(c, "success", scene, seq)
var scene = ""
var seq int64 = 0
if nil != currentScene {
scene = currentScene.Scene
}
if nil != currentTest {
seq = currentTest.SequenceId
}
return successResponseEx(c, "success", scene, seq)
}
var capturedImage image.Image
func sceneTaskProc(cmd *SceneCommand) error {
var f func(int64) error = nil
switch cmd.Scene {
case "pullUp":
f = pullUpTask
case "standJump":
f = standJumpTask
case "sitUps":
f = sitUpsTask
case "race":
f = raceTask
default:
return errors.New("unknown scene type:" + cmd.Scene)
}
sceneStopChannel = make(chan bool)
var tk = time.NewTicker(time.Second * 3)
go func() {
defer func() {
tk.Stop()
close(sceneStopChannel)
sceneStopChannel = nil
}()
currentScene = cmd
currentScene.proc = f
time.Sleep(time.Second)
var st = SceneReadyResponse{
Scene: cmd.Scene,
Timestamp: time.Now().Unix(),
SceneData: SceneData{
State: "SCENE READY",
LiveUrl: cmd.CameraUrls,
},
}
go httpPostEx(cmd.PushUrl, st, 3)
for {
select {
case <-tk.C:
// log.Println(" ...")
case <-sceneStopChannel:
log.Println("Scene Stopping...")
go pushEvent(cmd.PushUrl, cmd.Scene, 0, "SCENE END", false)
currentScene = nil
return
}
}
}()
return nil
var f func(int64) error = nil
switch cmd.Scene {
case "pullUp":
f = pullUpTask
case "standJump":
f = standJumpTask
case "sitUps":
f = sitUpsTask
case "race":
f = raceTask
default:
return errors.New("unknown scene type:" + cmd.Scene)
}
if len(cmd.CameraUrls) < 1 {
return errors.New("no camera specified")
}
var cc *gortsplib.Client
if c, u, e := setupRtsp(cmd.CameraUrls[0]); nil != e {
log.Println("setupRtsp failed:", e)
} else {
cc = c
go func() {
if e := playRtsp(c, u, func(img image.Image) {
//log.Println("...capturedImage:>", img.Bounds())
capturedImage = img
}); nil != e {
//log.Println("playRtsp failed:>", e)
}
}()
}
sceneStopChannel = make(chan bool)
var tk = time.NewTicker(time.Second * 3)
go func() {
defer func() {
tk.Stop()
close(sceneStopChannel)
if nil != cc {
_ = cc.Close()
}
sceneStopChannel = nil
}()
currentScene = cmd
currentScene.proc = f
time.Sleep(time.Second)
var st = SceneReadyResponse{
Scene: cmd.Scene,
Timestamp: time.Now().Unix(),
SceneData: SceneData{
State: "SCENE READY",
LiveUrl: cmd.CameraUrls,
},
}
go httpPostEx(cmd.PushUrl, st, 3)
for {
select {
case <-tk.C:
// log.Println(" ...")
case <-sceneStopChannel:
log.Println("Scene Stopping...")
go pushEvent(cmd.PushUrl, cmd.Scene, 0, "SCENE END", nil)
currentScene = nil
return
}
}
}()
return nil
}
func init() {
currentScene = nil
sceneStopChannel = nil
currentScene = nil
sceneStopChannel = nil
}

Loading…
Cancel
Save