Browse Source

Add back ground task progress support.

v0.7
Mingcai SHEN 5 years ago
parent
commit
4069a19c8c
3 changed files with 78 additions and 15 deletions
  1. +13
    -4
      restlet/types.go
  2. +27
    -0
      service/context.go
  3. +38
    -11
      service/service.go

+ 13
- 4
restlet/types.go View File

@ -18,10 +18,11 @@ import (
type Parameters map[string]string
type ControlResult struct {
Total int64 `json:"total"`
Count int64 `json:"count"`
Offset int64 `json:"offset"`
Limit int64 `json:"limit"`
Total int64 `json:"total"`
Count int64 `json:"count"`
Offset int64 `json:"offset"`
Limit int64 `json:"limit"`
ProgressURL string `json:"progressUrl,omitempty"`
}
type DebugInfo struct {
@ -29,6 +30,12 @@ type DebugInfo struct {
Finish int64
}
type Progress struct {
ID string `json:"id"`
Finished int `json:"finished"`
Total int `json:"total"`
}
type Result struct {
Code uint `json:"code"`
Message string `json:"message,omitempty"`
@ -81,6 +88,8 @@ type Context interface {
type RequestContext interface {
Context
Request() *http.Request
NewProgress(total int, prefixes ...interface{}) (string, string, error) // Return ID and progress URL
UpdateProgress(id string, finished int)
}
type ContextProvider interface {

+ 27
- 0
service/context.go View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"path"
_ "github.com/lib/pq"
@ -13,6 +14,7 @@ import (
"cygnux.net/kepler/config"
"cygnux.net/kepler/database"
"cygnux.net/kepler/kv"
"cygnux.net/kepler/misc"
"cygnux.net/kepler/restlet"
)
@ -28,6 +30,31 @@ func NewRequestContext(service *KeplerService, request *http.Request) *RequestCo
return ctx
}
const KeplerBgTaskBucketId = "kepler_service_bg_tasks"
func (rc *RequestContext) NewProgress(total int, prefixes ...interface{}) (string, string, error) { // Return ID and progress URL
id := misc.MakeResourceId(prefixes...).String()
uri := path.Join(rc._service.baseProgressUri, id)
prog := restlet.Progress{
ID: id,
Total: total,
}
if e := rc.Bucket(KeplerBgTaskBucketId).Set(id, prog); nil != e {
return "", "", e
}
return id, uri, nil
}
func (rc *RequestContext) UpdateProgress(id string, finished int) {
if p, e := rc.Bucket(KeplerBgTaskBucketId).Get(id); nil == e {
if prog, b := p.(restlet.Progress); b {
prog.Finished = finished
_ = rc.Bucket(KeplerBgTaskBucketId).Set(id, prog)
}
}
}
func (rc *RequestContext) SQL(name ...string) *sql.DB {
if len(name) > 0 {
if v, b := rc._service.db[name[0]]; b {

+ 38
- 11
service/service.go View File

@ -1,6 +1,7 @@
package service
import (
"fmt"
"io"
"net/http"
"net/http/pprof"
@ -47,16 +48,17 @@ func validateMethod(r *http.Request, methods ...string) bool {
}
type KeplerService struct {
prefix string
config config.Config
router *mux.Router
sessionKeeper func(next restlet.RequestHandler) restlet.RequestHandler
db map[string]database.DBI
cache map[string]cache.Cache
kvstore map[string]kv.KV
bucketMngr *bucket.Manager
mpub msq.Publisher
msub msq.Subscriber
prefix string
config config.Config
router *mux.Router
sessionKeeper func(next restlet.RequestHandler) restlet.RequestHandler
db map[string]database.DBI
cache map[string]cache.Cache
kvstore map[string]kv.KV
bucketMngr *bucket.Manager
mpub msq.Publisher
msub msq.Subscriber
baseProgressUri string
//
initialHandlers []*InitializeHandler // for initialize
requestHandlers []*RequestHandler // for HTTP Request
@ -206,7 +208,6 @@ func (svr *KeplerService) Initialize() error {
return nil
}
func (svr *KeplerService) SetSessionKeeper(ff func(next restlet.RequestHandler) restlet.RequestHandler) {
svr.sessionKeeper = ff
}
@ -215,6 +216,10 @@ func (svr *KeplerService) NewContext(request *http.Request) restlet.RequestConte
return NewRequestContext(svr, request)
}
func (svr *KeplerService) SetProgressURI(ss string) {
svr.baseProgressUri = ss
}
func makeMsgSubFunc(ctx restlet.TaskContext, h restlet.TaskletHandler) func([]byte) error {
var f = func(data []byte) error {
if e := h.Handle(ctx, data); nil != e {
@ -306,6 +311,10 @@ func (svr *KeplerService) Serve() error {
pCron.Stop()
}
}()
if svr.baseProgressUri != "" {
svr.baseProgressUri = "_internal/tasks"
}
svr.RegisterRequest(svr.baseProgressUri, restlet.RestletFuncEx(svr.ProgressHandle), "GET")
for _, h := range svr.requestHandlers {
fullPrefix := strings.TrimRight(svr.prefix, "/") + "/" + strings.TrimLeft(h.Prefix, "/") // path.Join(svr.prefix, h.Prefix)
log.Infoln("Routing :", fullPrefix, h.Methods)
@ -412,6 +421,20 @@ func (svr *KeplerService) RegisterSchedule(schedule string, handler restlet.Task
})
}
func (svr *KeplerService) ProgressHandle(ctx restlet.RequestContext, params restlet.Parameters, queries restlet.Parameters, data []byte) (int, interface{}, error) {
if id, b := params.GetString("id"); b {
if p, e := ctx.Bucket(KeplerBgTaskBucketId).Get(id); nil != e {
return restlet.ErrorNotFound, nil, e
} else if prog, b := p.(restlet.Progress); b {
return restlet.SuccessOk, prog, nil
} else {
return restlet.ErrorNotFound, nil, fmt.Errorf("{id} not found")
}
} else {
return restlet.ErrorNotFound, nil, fmt.Errorf("{id} requied")
}
}
var (
builtinService = &KeplerService{
router: mux.NewRouter().StrictSlash(true),
@ -449,6 +472,10 @@ func SetSessionKeeper(ff func(next restlet.RequestHandler) restlet.RequestHandle
builtinService.SetSessionKeeper(ff)
}
func SetProgressURI(ss string) {
builtinService.SetProgressURI(ss)
}
func RegisterInitialize(order int, name string, handler restlet.TaskletHandler) {
builtinService.RegisterInitialize(order, name, handler)
}

Loading…
Cancel
Save