Browse Source

Updated: add builtinService to allowed quick app integration.

v0.6
Mingcai SHEN 6 years ago
parent
commit
754bc93fdd
1 changed files with 81 additions and 38 deletions
  1. +81
    -38
      service/service.go

+ 81
- 38
service/service.go View File

@ -72,21 +72,28 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
log.Debugln("NewService:> ", prefixes) log.Debugln("NewService:> ", prefixes)
//var e error //var e error
svr := &KeplerService{ svr := &KeplerService{
config: cfg,
router: mux.NewRouter().StrictSlash(true), router: mux.NewRouter().StrictSlash(true),
db: make(map[string]database.DBI), db: make(map[string]database.DBI),
cache: make(map[string]cache.Cache), cache: make(map[string]cache.Cache),
kvstore: make(map[string]kv.KV), kvstore: make(map[string]kv.KV),
} }
if e := svr.Config(cfg, prefixes...); nil != e {
return nil, e
}
return svr, nil
}
func (svr *KeplerService) Config(cfg config.Config, prefixes ...string) error {
if len(prefixes) > 0 { if len(prefixes) > 0 {
svr.prefix = prefixes[0] svr.prefix = prefixes[0]
} else { } else {
svr.prefix = urlPrefix svr.prefix = urlPrefix
} }
svr.config = cfg
if db, e := database.SetupDBI(cfg.Sub("database")); nil != e { if db, e := database.SetupDBI(cfg.Sub("database")); nil != e {
log.Errorln("Open database failed:> ", e) log.Errorln("Open database failed:> ", e)
return nil, e
return e
} else { } else {
svr.db["default"] = db svr.db["default"] = db
} }
@ -95,7 +102,7 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
for _, x := range extraDatabases { for _, x := range extraDatabases {
if db, e := database.SetupDBI(cfg.Sub("database" + "_" + x)); nil != e { if db, e := database.SetupDBI(cfg.Sub("database" + "_" + x)); nil != e {
log.Errorf("Open database(%s) failed:> %s\n", x, e) log.Errorf("Open database(%s) failed:> %s\n", x, e)
return nil, e
return e
} else { } else {
svr.db[x] = db svr.db[x] = db
} }
@ -104,7 +111,7 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
if cc, e := cache.SetupCache(cfg.Sub("cache")); nil != e { if cc, e := cache.SetupCache(cfg.Sub("cache")); nil != e {
log.Errorln("Make Cache failed:> ", e) log.Errorln("Make Cache failed:> ", e)
return nil, e
return e
} else { } else {
svr.cache["default"] = cc svr.cache["default"] = cc
} }
@ -113,7 +120,7 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
for _, x := range extraCaches { for _, x := range extraCaches {
if cc, e := cache.SetupCache(cfg.Sub("cache" + "_" + x)); nil != e { if cc, e := cache.SetupCache(cfg.Sub("cache" + "_" + x)); nil != e {
log.Errorf("Make Cache(%s) failed:> %s\n", x, e) log.Errorf("Make Cache(%s) failed:> %s\n", x, e)
return nil, e
return e
} else { } else {
svr.cache[x] = cc svr.cache[x] = cc
} }
@ -122,7 +129,7 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
if kk, e := kv.SetupKV(cfg.Sub("kv")); nil != e { if kk, e := kv.SetupKV(cfg.Sub("kv")); nil != e {
log.Errorln("Make KVStore failed:> ", e) log.Errorln("Make KVStore failed:> ", e)
return nil, e
return e
} else { } else {
svr.kvstore["default"] = kk svr.kvstore["default"] = kk
} }
@ -131,7 +138,7 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
for _, x := range extraKvs { for _, x := range extraKvs {
if kk, e := kv.SetupKV(cfg.Sub("kv" + "_" + x)); nil != e { if kk, e := kv.SetupKV(cfg.Sub("kv" + "_" + x)); nil != e {
log.Errorf("Make KVStore(%s) failed:> %s\n", x, e) log.Errorf("Make KVStore(%s) failed:> %s\n", x, e)
return nil, e
return e
} else { } else {
svr.kvstore[x] = kk svr.kvstore[x] = kk
} }
@ -140,13 +147,13 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
if m, e := msq.SetupPublisher(cfg.Sub("publish")); nil != e { if m, e := msq.SetupPublisher(cfg.Sub("publish")); nil != e {
log.Errorln("Make MessagePub failed:> ", e) log.Errorln("Make MessagePub failed:> ", e)
return nil, e
return e
} else { } else {
svr.mpub = m svr.mpub = m
} }
if m, e := msq.SetupSubscriber(cfg.Sub("subscribe")); nil != e { if m, e := msq.SetupSubscriber(cfg.Sub("subscribe")); nil != e {
log.Errorln("Make MessageSub failed:> ", e) log.Errorln("Make MessageSub failed:> ", e)
return nil, e
return e
} else { } else {
svr.msub = m svr.msub = m
} }
@ -158,7 +165,7 @@ func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
//svr.taskHandlers = make(map[string]restlet.TaskletHandler) //svr.taskHandlers = make(map[string]restlet.TaskletHandler)
//svr.msgHandlers = make(map[string]restlet.TaskletHandler) //svr.msgHandlers = make(map[string]restlet.TaskletHandler)
//svr.loadBuiltinHandlers() //svr.loadBuiltinHandlers()
return svr, nil
return nil
} }
func (svr *KeplerService) Router() *mux.Router { func (svr *KeplerService) Router() *mux.Router {
@ -182,11 +189,11 @@ func (svr *KeplerService) Initialize() error {
return nil return nil
} }
var sessionKeeper func(ctx restlet.Context, next http.Handler) http.Handler
func SetSessionKeeper(ff func(ctx restlet.Context, next http.Handler) http.Handler) {
sessionKeeper = ff
}
//var sessionKeeper func(ctx restlet.Context, next http.Handler) http.Handler
//
//func SetSessionKeeper(ff func(ctx restlet.Context, next http.Handler) http.Handler) {
// sessionKeeper = ff
//}
func (svr *KeplerService) SetSessionKeeper(ff func(ctx restlet.Context, next http.Handler) http.Handler) { func (svr *KeplerService) SetSessionKeeper(ff func(ctx restlet.Context, next http.Handler) http.Handler) {
svr.sessionKeeper = ff svr.sessionKeeper = ff
@ -196,21 +203,21 @@ func (svr *KeplerService) NewContext(request *http.Request) restlet.RequestConte
return NewRequestContext(svr, request) return NewRequestContext(svr, request)
} }
func (svr *KeplerService) makeRestletHandler(h restlet.RestletHandler, predictor restlet.RequestPredictor, methods []string, cache *restlet.CacheController) http.Handler {
return restlet.MakeRestletHandler(h, predictor, svr, methods, cache)
}
func (svr *KeplerService) registerRestlet(prefix string, predictor restlet.RequestPredictor, methods []string, handler restlet.RestletHandler, cache *restlet.CacheController) {
h := svr.makeRestletHandler(handler, predictor, methods, cache)
log.Debugln(">>", svr.prefix, prefix)
if svr.prefix == "" {
log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), prefix)
svr.router.Handle(prefix, h)
} else {
log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), svr.prefix+prefix)
svr.router.Handle(svr.prefix+prefix, h)
}
}
//func (svr *KeplerService) makeRestletHandler(h restlet.RestletHandler, predictor restlet.RequestPredictor, methods []string, cache *restlet.CacheController) http.Handler {
// return restlet.MakeRestletHandler(h, predictor, svr, methods, cache)
//}
//
//func (svr *KeplerService) registerRestlet(prefix string, predictor restlet.RequestPredictor, methods []string, handler restlet.RestletHandler, cache *restlet.CacheController) {
// h := svr.makeRestletHandler(handler, predictor, methods, cache)
// log.Debugln(">>", svr.prefix, prefix)
// if svr.prefix == "" {
// log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), prefix)
// svr.router.Handle(prefix, h)
// } else {
// log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), svr.prefix+prefix)
// svr.router.Handle(svr.prefix+prefix, h)
// }
//}
//func (svr *KeplerService) TaskProc() { //func (svr *KeplerService) TaskProc() {
// for t := range svr.taskChans { // for t := range svr.taskChans {
@ -226,22 +233,22 @@ func (svr *KeplerService) registerRestlet(prefix string, predictor restlet.Reque
// } // }
//} //}
func (svr *KeplerService) MessageProc() {
func (svr *KeplerService) messageProc() {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, h := range svr.messageHandlers { for _, h := range svr.messageHandlers {
wg.Add(1) wg.Add(1)
//msgKey := k //msgKey := k
//msgHandle := h //msgHandle := h
err := svr.msub.Subscribe(h.Topic, func(data []byte) error { err := svr.msub.Subscribe(h.Topic, func(data []byte) error {
log.Debugln("MessageProc:> get msg via:", h.Topic)
log.Debugln("messageProc:> get msg via:", h.Topic)
if e := h.Handler.Handle(NewTaskContext(svr), data); nil != e { if e := h.Handler.Handle(NewTaskContext(svr), data); nil != e {
log.Errorln("MessageProc:> Call Handler failed:", e)
log.Errorln("messageProc:> Call Handler failed:", e)
return e return e
} }
return nil return nil
}) })
if nil != err { if nil != err {
log.Errorln("MessageProc:> Subscribe failed:", err)
log.Errorln("messageProc:> Subscribe failed:", err)
return return
} }
} }
@ -277,7 +284,7 @@ func (svr *KeplerService) runTaskHandler(handler restlet.TaskletHandler) func()
func (svr *KeplerService) Serve() error { func (svr *KeplerService) Serve() error {
//go svr.TaskProc() //go svr.TaskProc()
go svr.MessageProc()
go svr.messageProc()
var pCron *cron.Cron var pCron *cron.Cron
if len(svr.scheduleHandlers) > 0 { if len(svr.scheduleHandlers) > 0 {
pCron = cron.New() pCron = cron.New()
@ -302,9 +309,9 @@ func (svr *KeplerService) Serve() error {
listenAddr := svr.config.GetString("service.listen", ":8080") listenAddr := svr.config.GetString("service.listen", ":8080")
logRequestFile := svr.config.GetString("service.log_request_file", "") logRequestFile := svr.config.GetString("service.log_request_file", "")
log.Infoln("Service:> Listening on ", listenAddr) log.Infoln("Service:> Listening on ", listenAddr)
if svr.sessionKeeper == nil && sessionKeeper != nil {
svr.sessionKeeper = sessionKeeper
}
//if svr.sessionKeeper == nil && sessionKeeper != nil {
// svr.sessionKeeper = sessionKeeper
//}
var rootHandler http.Handler var rootHandler http.Handler
if svr.sessionKeeper != nil { if svr.sessionKeeper != nil {
rootHandler = svr.sessionKeeper(NewTaskContext(svr), svr.router) rootHandler = svr.sessionKeeper(NewTaskContext(svr), svr.router)
@ -386,3 +393,39 @@ func (svr *KeplerService) RegisterSchedule(schedule string, handler restlet.Task
Handler: handler, Handler: handler,
}) })
} }
var (
builtinService = &KeplerService{}
)
func Serve() error {
return builtinService.Serve()
}
func Config(cfg config.Config, prefixes ...string) error {
return builtinService.Config(cfg, prefixes...)
}
func Initialize() error {
return builtinService.Initialize()
}
func SetSessionKeeper(ff func(ctx restlet.Context, next http.Handler) http.Handler) {
builtinService.SetSessionKeeper(ff)
}
func RegisterInitialize(name string, handler restlet.TaskletHandler) {
builtinService.RegisterInitialize(name, handler)
}
func RegisterRequest(prefix string, handler restlet.RequestHandler, methods ...string) {
builtinService.RegisterRequest(prefix, handler, methods...)
}
func RegisterMessage(topic string, handler restlet.TaskletHandler) {
builtinService.RegisterMessage(topic, handler)
}
func RegisterSchedule(schedule string, handler restlet.TaskletHandler) {
builtinService.RegisterSchedule(schedule, handler)
}

Loading…
Cancel
Save