Browse Source

Adding Initialize handling.

develop
Mingcai SHEN 7 years ago
parent
commit
d06d148819
1 changed files with 227 additions and 206 deletions
  1. +227
    -206
      service/service.go

+ 227
- 206
service/service.go View File

@ -1,207 +1,228 @@
package service
import (
"net/http"
"database/sql"
"github.com/gorilla/mux"
_ "github.com/lib/pq"
log "github.com/Sirupsen/logrus"
"strings"
"cygnux/kepler/restlet"
"sync"
"cygnux/kepler/cache"
"cygnux/kepler/kv"
"cygnux/kepler/utils"
"cygnux/kepler/msq"
"cygnux/kepler/configs"
)
type KeplerService struct {
prefix string
option configs.GenericOption
params configs.ParameterOption
db *sql.DB
db_schema string
router *mux.Router
cache cache.Cache
kvstore kv.KV
mpub msq.Publisher
msub msq.Subscriber
_task_chan chan restlet.TaskObject
_task_handlers map[string]restlet.TaskletHandler
_msg_handlers map[string]restlet.TaskletHandler
}
const URL_PREFIX = "/"
func NewService(op OptionProvider, prefixes ...string) (*KeplerService, error) {
log.Debugln("NewService:> ", op.Option())
var e error
if len(prefixes) == 0 {
prefixes = []string{URL_PREFIX}
}
svr := &KeplerService{
params:op.Parameters(),
option: op.Option(),
router: mux.NewRouter().StrictSlash(true),
prefix: prefixes[0],
}
if svr.db, svr.db_schema, e = utils.SqlDBConnect(op.Option().DB); nil != e {
log.Errorln("Open database failed:> ", e)
return nil, e
}
if svr.cache, e = cache.NewCache(op.Option().Cache); nil != e {
log.Errorln("Make Cache failed:> ", e)
return nil, e
}
if svr.kvstore, e = kv.NewKV(op.Option().KV); nil != e {
log.Errorln("Make KVStore failed:> ", e)
return nil, e
}
if svr.mpub, e = msq.NewPublisher(op.Option().MPub); nil != e {
log.Errorln("Make MPub failed:> ", e)
return nil, e
}
if svr.msub, e = msq.NewSubscriber(op.Option().MSub); nil != e {
log.Errorln("Make MSub failed:> ", e)
return nil, e
}
svr._task_chan = make(chan restlet.TaskObject, 1024)
if op.Option().Debug {
restlet.SetDebug(op.Option().Debug)
log.Debugln("Restlet debug on!")
}
svr._task_handlers = make(map[string]restlet.TaskletHandler)
svr._msg_handlers = make(map[string]restlet.TaskletHandler)
svr.load_builtin_handlers()
return svr, nil
}
const X_CYGNUX_SIGN = "X-CYGNUX-SIGN"
func (self *KeplerService) Router() *mux.Router {
return self.router
}
func (self *KeplerService) Predicate(request *http.Request) (string, bool) {
sign := request.Header.Get(X_CYGNUX_SIGN)
log.Infoln("X-CYGNUX-SIGN:> ", sign)
return "", true
if sign == "" {
return "", false
}
return "", true
}
func (self *KeplerService) NewContext(request *http.Request) restlet.RequestContext {
return NewRequestContext(self, request)
}
func (self *KeplerService) makeRestletHandler(h restlet.RestletHandler, methods []string, cache *restlet.CacheController) http.Handler {
return restlet.MakeRestletHandler(h, self, self, methods, cache)
}
func (self *KeplerService) registerRestlet(prefix string, methods []string, handler restlet.RestletHandler, cache *restlet.CacheController) {
h := self.makeRestletHandler(handler, methods, cache)
log.Infof("Register:> Registering %s: %s ...", strings.Join(methods, ","), self.prefix + "/" + prefix)
self.router.Handle(self.prefix + "/" + prefix, h)
}
func (self *KeplerService) TaskProc() {
for t := range self._task_chan {
if h, ok := self._task_handlers[t.Queue]; ok {
if e := h.Handle(NewTaskContext(self), t.Params...); nil != e {
log.Errorf("TaskProc:> %s Handle failed:> %s \n", t.Queue, e)
}
} else {
log.Warnln("TaskProc:> Missing task handler for:", t.Queue)
}
}
}
func (self *KeplerService) MessageProc() {
wg := &sync.WaitGroup{}
for k, h := range self._msg_handlers {
wg.Add(1)
err := self.msub.Subscribe(k, func(data []byte) error {
return h.Handle(NewTaskContext(self), data)
})
if nil != err {
return
}
}
wg.Wait()
}
func (self *KeplerService) Serve() error {
go self.TaskProc()
go self.MessageProc()
return http.ListenAndServe(self.option.ListenAddr, self.router)
}
func (self *KeplerService) Register_RestletHandler(prefix string, methods []string, handler restlet.RestletHandler, cache_ctrl *restlet.CacheController) {
self.registerRestlet(prefix, methods, handler, cache_ctrl)
}
func (self *KeplerService) Register_TaskHandler(topic string, handler restlet.TaskletHandler) {
self._task_handlers[topic] = handler
}
func (self *KeplerService) Register_MessageHandler(topic string, handler restlet.TaskletHandler) {
self._msg_handlers[topic] = handler
}
func (self *KeplerService) load_builtin_handlers() {
for _, x := range builtin_restlet_handlers {
self.registerRestlet(x.prefix, x.methods, x.handler, x.cache_control)
}
for _, x := range builtin_tasklet_handlers {
self._task_handlers[x.name] = x.handler
}
for _, x := range builtin_message_handlers {
self._msg_handlers[x.name] = x.handler
}
}
var builtin_restlet_handlers []ServiceRestlet = []ServiceRestlet{}
var builtin_tasklet_handlers []ServiceTasklet = []ServiceTasklet{}
var builtin_message_handlers []ServiceMessagelet = []ServiceMessagelet{}
func RegisterRestletHandleFunc(f func(ctx restlet.RequestContext, url_params restlet.Parameters, queries restlet.Parameters, post_data []byte) (*restlet.RestletResult, error),
methods string, rcc *restlet.CacheController, prefixes ...string) {
_registerHandler(restlet.RestletHandleFunc(f), methods, rcc, prefixes...)
}
func _registerHandler(handler restlet.RestletHandler, methods string, rcc *restlet.CacheController, prefixes ...string) {
for _, prefix := range prefixes {
prefix = strings.TrimLeft(strings.TrimSpace(prefix), "/")
if prefix == "" {
continue
}
p := prefix
ms := strings.Split(methods, ",")
if len(ms) == 0 {
ms = []string{"*"}
} else {
for i, x := range ms {
ms[i] = strings.ToUpper(x)
}
}
builtin_restlet_handlers = append(builtin_restlet_handlers, ServiceRestlet{
methods: ms,
prefix: p,
handler: handler,
cache_control: rcc,
})
}
}
func RegisterTasklet(queue string, f func(restlet.TaskContext, ...interface{}) error) {
builtin_tasklet_handlers = append(builtin_tasklet_handlers, ServiceTasklet{name: queue, handler: restlet.TaskletHandlerFunc(f)})
}
func RegisterMsgHandle(queue string, f func(restlet.TaskContext, ...interface{}) error) {
builtin_message_handlers = append(builtin_message_handlers, ServiceMessagelet{name: queue, handler: restlet.TaskletHandlerFunc(f)})
package service
import (
"net/http"
"database/sql"
"github.com/gorilla/mux"
_ "github.com/lib/pq"
log "github.com/Sirupsen/logrus"
"strings"
"cygnux/kepler/restlet"
"sync"
"cygnux/kepler/cache"
"cygnux/kepler/kv"
"cygnux/kepler/utils"
"cygnux/kepler/msq"
"cygnux/kepler/configs"
)
type KeplerService struct {
prefix string
option configs.GenericOption
params configs.ParameterOption
db *sql.DB
db_schema string
router *mux.Router
cache cache.Cache
kvstore kv.KV
mpub msq.Publisher
msub msq.Subscriber
_task_chan chan restlet.TaskObject
_task_handlers map[string]restlet.TaskletHandler
_msg_handlers map[string]restlet.TaskletHandler
}
const URL_PREFIX = "/"
func NewService(op OptionProvider, prefixes ...string) (*KeplerService, error) {
log.Debugln("NewService:> ", op.Option())
var e error
if len(prefixes) == 0 {
prefixes = []string{URL_PREFIX}
}
svr := &KeplerService{
params:op.Parameters(),
option: op.Option(),
router: mux.NewRouter().StrictSlash(true),
prefix: prefixes[0],
}
if svr.db, svr.db_schema, e = utils.SqlDBConnect(op.Option().DB); nil != e {
log.Errorln("Open database failed:> ", e)
return nil, e
}
if svr.cache, e = cache.NewCache(op.Option().Cache); nil != e {
log.Errorln("Make Cache failed:> ", e)
return nil, e
}
if svr.kvstore, e = kv.NewKV(op.Option().KV); nil != e {
log.Errorln("Make KVStore failed:> ", e)
return nil, e
}
if svr.mpub, e = msq.NewPublisher(op.Option().MPub); nil != e {
log.Errorln("Make MPub failed:> ", e)
return nil, e
}
if svr.msub, e = msq.NewSubscriber(op.Option().MSub); nil != e {
log.Errorln("Make MSub failed:> ", e)
return nil, e
}
svr._task_chan = make(chan restlet.TaskObject, 1024)
if op.Option().Debug {
restlet.SetDebug(op.Option().Debug)
log.Debugln("Restlet debug on!")
}
svr._task_handlers = make(map[string]restlet.TaskletHandler)
svr._msg_handlers = make(map[string]restlet.TaskletHandler)
svr.load_builtin_handlers()
return svr, nil
}
const X_CYGNUX_SIGN = "X-CYGNUX-SIGN"
func (self *KeplerService) Router() *mux.Router {
return self.router
}
func (self *KeplerService) Predicate(request *http.Request) (string, bool) {
sign := request.Header.Get(X_CYGNUX_SIGN)
log.Infoln("X-CYGNUX-SIGN:> ", sign)
return "", true
if sign == "" {
return "", false
}
return "", true
}
func (self *KeplerService) NewContext(request *http.Request) restlet.RequestContext {
return NewRequestContext(self, request)
}
func (self *KeplerService) makeRestletHandler(h restlet.RestletHandler, methods []string, cache *restlet.CacheController) http.Handler {
return restlet.MakeRestletHandler(h, self, self, methods, cache)
}
func (self *KeplerService) registerRestlet(prefix string, methods []string, handler restlet.RestletHandler, cache *restlet.CacheController) {
h := self.makeRestletHandler(handler, methods, cache)
log.Infof("Register:> Registering %s: %s ...", strings.Join(methods, ","), self.prefix + "/" + prefix)
self.router.Handle(self.prefix + "/" + prefix, h)
}
func (self *KeplerService) TaskProc() {
for t := range self._task_chan {
if h, ok := self._task_handlers[t.Queue]; ok {
if e := h.Handle(NewTaskContext(self), t.Params...); nil != e {
log.Errorf("TaskProc:> %s Handle failed:> %s \n", t.Queue, e)
}
} else {
log.Warnln("TaskProc:> Missing task handler for:", t.Queue)
}
}
}
func (self *KeplerService) MessageProc() {
wg := &sync.WaitGroup{}
for k, h := range self._msg_handlers {
wg.Add(1)
err := self.msub.Subscribe(k, func(data []byte) error {
return h.Handle(NewTaskContext(self), data)
})
if nil != err {
return
}
}
wg.Wait()
}
func (self *KeplerService) Serve() error {
go self.TaskProc()
go self.MessageProc()
return http.ListenAndServe(self.option.ListenAddr, self.router)
}
func (self *KeplerService) Register_RestletHandler(prefix string, methods []string, handler restlet.RestletHandler, cache_ctrl *restlet.CacheController) {
self.registerRestlet(prefix, methods, handler, cache_ctrl)
}
func (self *KeplerService) Register_TaskHandler(topic string, handler restlet.TaskletHandler) {
self._task_handlers[topic] = handler
}
func (self *KeplerService) Register_MessageHandler(topic string, handler restlet.TaskletHandler) {
self._msg_handlers[topic] = handler
}
func (self *KeplerService) load_builtin_handlers() {
for _, x := range builtin_restlet_handlers {
self.registerRestlet(x.prefix, x.methods, x.handler, x.cache_control)
}
for _, x := range builtin_tasklet_handlers {
self._task_handlers[x.name] = x.handler
}
for _, x := range builtin_message_handlers {
self._msg_handlers[x.name] = x.handler
}
}
var builtin_restlet_handlers []ServiceRestlet = []ServiceRestlet{}
var builtin_tasklet_handlers []ServiceTasklet = []ServiceTasklet{}
var builtin_message_handlers []ServiceMessagelet = []ServiceMessagelet{}
var builtin_initialize_procs []ServiceTasklet = []ServiceTasklet{}
/**
* RegisterRestletHandleFunc
* Register a Restlet handler to service.
*/
func RegisterRestletHandleFunc(f func(ctx restlet.RequestContext, url_params restlet.Parameters, queries restlet.Parameters, post_data []byte) (*restlet.RestletResult, error),
methods string, rcc *restlet.CacheController, prefixes ...string) {
_registerHandler(restlet.RestletHandleFunc(f), methods, rcc, prefixes...)
}
func _registerHandler(handler restlet.RestletHandler, methods string, rcc *restlet.CacheController, prefixes ...string) {
for _, prefix := range prefixes {
prefix = strings.TrimLeft(strings.TrimSpace(prefix), "/")
if prefix == "" {
continue
}
p := prefix
ms := strings.Split(methods, ",")
if len(ms) == 0 {
ms = []string{"*"}
} else {
for i, x := range ms {
ms[i] = strings.ToUpper(x)
}
}
builtin_restlet_handlers = append(builtin_restlet_handlers, ServiceRestlet{
methods: ms,
prefix: p,
handler: handler,
cache_control: rcc,
})
}
}
/**
* RegisterTasklet
* Register a background task.
*/
func RegisterTasklet(queue string, f func(restlet.TaskContext, ...interface{}) error) {
builtin_tasklet_handlers = append(builtin_tasklet_handlers, ServiceTasklet{name: queue, handler: restlet.TaskletHandlerFunc(f)})
}
/**
* RegisterMsgHandle
* Register a message handler
*/
func RegisterMsgHandle(queue string, f func(restlet.TaskContext, ...interface{}) error) {
builtin_message_handlers = append(builtin_message_handlers, ServiceMessagelet{name: queue, handler: restlet.TaskletHandlerFunc(f)})
}
/**
* RegisterInitialize
* Register an intialize function for service, function only called when service initialize funcation was called.
*/
func RegisterInitialize(name string, f func(restlet.TaskContext,...interface{}) error) {
builtin_initialize_procs = append(builtin_initialize_procs, ServiceTasklet{name:name, handler:restlet.TaskletHandler(f)})
}

Loading…
Cancel
Save