diff --git a/restlet/types.go b/restlet/types.go index c6c128e..551cb98 100644 --- a/restlet/types.go +++ b/restlet/types.go @@ -18,10 +18,11 @@ import ( type Parameters map[string]string type ControlResult struct { - Total int64 `json:"total"` - Count int64 `json:"count"` - Offset int64 `json:"offset"` - Limit int64 `json:"limit"` + Total int64 `json:"total"` + Count int64 `json:"count"` + Offset int64 `json:"offset"` + Limit int64 `json:"limit"` + ProgressURL string `json:"progressUrl,omitempty"` } type DebugInfo struct { @@ -29,6 +30,12 @@ type DebugInfo struct { Finish int64 } +type Progress struct { + ID string `json:"id"` + Finished int `json:"finished"` + Total int `json:"total"` +} + type Result struct { Code uint `json:"code"` Message string `json:"message,omitempty"` @@ -81,6 +88,8 @@ type Context interface { type RequestContext interface { Context Request() *http.Request + NewProgress(total int, prefixes ...interface{}) (string, string, error) // Return ID and progress URL + UpdateProgress(id string, finished int) } type ContextProvider interface { diff --git a/service/context.go b/service/context.go index 55ec1d8..b83bfdd 100644 --- a/service/context.go +++ b/service/context.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "path" _ "github.com/lib/pq" @@ -13,6 +14,7 @@ import ( "cygnux.net/kepler/config" "cygnux.net/kepler/database" "cygnux.net/kepler/kv" + "cygnux.net/kepler/misc" "cygnux.net/kepler/restlet" ) @@ -28,6 +30,31 @@ func NewRequestContext(service *KeplerService, request *http.Request) *RequestCo return ctx } +const KeplerBgTaskBucketId = "kepler_service_bg_tasks" + +func (rc *RequestContext) NewProgress(total int, prefixes ...interface{}) (string, string, error) { // Return ID and progress URL + id := misc.MakeResourceId(prefixes...).String() + uri := path.Join(rc._service.baseProgressUri, id) + prog := restlet.Progress{ + ID: id, + Total: total, + } + if e := rc.Bucket(KeplerBgTaskBucketId).Set(id, prog); nil != e { + return "", "", e + } + return id, uri, nil +} + +func (rc *RequestContext) UpdateProgress(id string, finished int) { + if p, e := rc.Bucket(KeplerBgTaskBucketId).Get(id); nil == e { + if prog, b := p.(restlet.Progress); b { + prog.Finished = finished + _ = rc.Bucket(KeplerBgTaskBucketId).Set(id, prog) + } + + } +} + func (rc *RequestContext) SQL(name ...string) *sql.DB { if len(name) > 0 { if v, b := rc._service.db[name[0]]; b { diff --git a/service/service.go b/service/service.go index 6e0aa77..41868c4 100644 --- a/service/service.go +++ b/service/service.go @@ -1,6 +1,7 @@ package service import ( + "fmt" "io" "net/http" "net/http/pprof" @@ -47,16 +48,17 @@ func validateMethod(r *http.Request, methods ...string) bool { } type KeplerService struct { - prefix string - config config.Config - router *mux.Router - sessionKeeper func(next restlet.RequestHandler) restlet.RequestHandler - db map[string]database.DBI - cache map[string]cache.Cache - kvstore map[string]kv.KV - bucketMngr *bucket.Manager - mpub msq.Publisher - msub msq.Subscriber + prefix string + config config.Config + router *mux.Router + sessionKeeper func(next restlet.RequestHandler) restlet.RequestHandler + db map[string]database.DBI + cache map[string]cache.Cache + kvstore map[string]kv.KV + bucketMngr *bucket.Manager + mpub msq.Publisher + msub msq.Subscriber + baseProgressUri string // initialHandlers []*InitializeHandler // for initialize requestHandlers []*RequestHandler // for HTTP Request @@ -206,7 +208,6 @@ func (svr *KeplerService) Initialize() error { return nil } - func (svr *KeplerService) SetSessionKeeper(ff func(next restlet.RequestHandler) restlet.RequestHandler) { svr.sessionKeeper = ff } @@ -215,6 +216,10 @@ func (svr *KeplerService) NewContext(request *http.Request) restlet.RequestConte return NewRequestContext(svr, request) } +func (svr *KeplerService) SetProgressURI(ss string) { + svr.baseProgressUri = ss +} + func makeMsgSubFunc(ctx restlet.TaskContext, h restlet.TaskletHandler) func([]byte) error { var f = func(data []byte) error { if e := h.Handle(ctx, data); nil != e { @@ -306,6 +311,10 @@ func (svr *KeplerService) Serve() error { pCron.Stop() } }() + if svr.baseProgressUri != "" { + svr.baseProgressUri = "_internal/tasks" + } + svr.RegisterRequest(svr.baseProgressUri, restlet.RestletFuncEx(svr.ProgressHandle), "GET") for _, h := range svr.requestHandlers { fullPrefix := strings.TrimRight(svr.prefix, "/") + "/" + strings.TrimLeft(h.Prefix, "/") // path.Join(svr.prefix, h.Prefix) log.Infoln("Routing :", fullPrefix, h.Methods) @@ -412,6 +421,20 @@ func (svr *KeplerService) RegisterSchedule(schedule string, handler restlet.Task }) } +func (svr *KeplerService) ProgressHandle(ctx restlet.RequestContext, params restlet.Parameters, queries restlet.Parameters, data []byte) (int, interface{}, error) { + if id, b := params.GetString("id"); b { + if p, e := ctx.Bucket(KeplerBgTaskBucketId).Get(id); nil != e { + return restlet.ErrorNotFound, nil, e + } else if prog, b := p.(restlet.Progress); b { + return restlet.SuccessOk, prog, nil + } else { + return restlet.ErrorNotFound, nil, fmt.Errorf("{id} not found") + } + } else { + return restlet.ErrorNotFound, nil, fmt.Errorf("{id} requied") + } +} + var ( builtinService = &KeplerService{ router: mux.NewRouter().StrictSlash(true), @@ -449,6 +472,10 @@ func SetSessionKeeper(ff func(next restlet.RequestHandler) restlet.RequestHandle builtinService.SetSessionKeeper(ff) } +func SetProgressURI(ss string) { + builtinService.SetProgressURI(ss) +} + func RegisterInitialize(order int, name string, handler restlet.TaskletHandler) { builtinService.RegisterInitialize(order, name, handler) }