Kepler core
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

438 lines
12 KiB

package service
import (
"io"
"net/http"
"os"
"path"
"strings"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
_ "github.com/lib/pq"
"github.com/robfig/cron"
"cygnux.net/kepler/cache"
"cygnux.net/kepler/config"
"cygnux.net/kepler/database"
"cygnux.net/kepler/kv"
"cygnux.net/kepler/msq"
"cygnux.net/kepler/restlet"
)
func handleFailure(w http.ResponseWriter, r *http.Request, code int, message string) {
w.Header().Set("Content-Type", "plain/text")
w.WriteHeader(code)
if n, e := w.Write([]byte(message)); nil != e {
log.Errorln("handleFailure:> write response failed:", n, e)
}
}
func validateMethod(r *http.Request, methods ...string) bool {
if len(methods) < 1 {
return true
} else if methods[0] == "*" || methods[0] == "" {
return true
}
for _, m := range methods {
if strings.ToUpper(m) == strings.ToUpper(r.Method) {
return true
}
}
return false
}
type KeplerService struct {
prefix string
config config.Config
router *mux.Router
sessionKeeper func(next restlet.RequestHandler) restlet.RequestHandler
db map[string]database.DBI
cache map[string]cache.Cache
kvstore map[string]kv.KV
mpub msq.Publisher
msub msq.Subscriber
//
initialHandlers []*InitializeHandler // for initialize
requestHandlers []*RequestHandler // for HTTP Request
messageHandlers []*MessageHandler // for Message Process
scheduleHandlers []*ScheduleHandler // for scheduled tasks
//taskChans chan restlet.TaskObject
//taskHandlers map[string]restlet.TaskletHandler
//msgHandlers map[string]restlet.TaskletHandler
//initHandlers []restlet.TaskletHandler
//cronHandlers []CronTask
}
const urlPrefix = "/"
func NewService(cfg config.Config, prefixes ...string) (*KeplerService, error) {
log.Debugln("NewService:> ", prefixes)
//var e error
svr := &KeplerService{
router: mux.NewRouter().StrictSlash(true),
db: make(map[string]database.DBI),
cache: make(map[string]cache.Cache),
kvstore: make(map[string]kv.KV),
}
if e := svr.Config(cfg, prefixes...); nil != e {
return nil, e
}
return svr, nil
}
func (svr *KeplerService) Config(cfg config.Config, prefixes ...string) error {
if len(prefixes) > 0 {
svr.prefix = prefixes[0]
} else {
svr.prefix = urlPrefix
}
svr.config = cfg
if db, e := database.SetupDBI(cfg.Sub("database")); nil != e {
log.Errorln("Open database failed:> ", e)
return e
} else {
svr.db["default"] = db
}
if extraDatabases := cfg.GetStringSlice("service.extra_databases"); len(extraDatabases) > 0 {
for _, x := range extraDatabases {
if db, e := database.SetupDBI(cfg.Sub("database" + "_" + x)); nil != e {
log.Errorf("Open database(%s) failed:> %s\n", x, e)
return e
} else {
svr.db[x] = db
}
}
}
if cc, e := cache.SetupCache(cfg.Sub("cache")); nil != e {
log.Errorln("Make Cache failed:> ", e)
return e
} else {
svr.cache["default"] = cc
}
if extraCaches := cfg.GetStringSlice("service.extra_caches"); len(extraCaches) > 0 {
for _, x := range extraCaches {
if cc, e := cache.SetupCache(cfg.Sub("cache" + "_" + x)); nil != e {
log.Errorf("Make Cache(%s) failed:> %s\n", x, e)
return e
} else {
svr.cache[x] = cc
}
}
}
if kk, e := kv.SetupKV(cfg.Sub("kv")); nil != e {
log.Errorln("Make KVStore failed:> ", e)
return e
} else {
svr.kvstore["default"] = kk
}
if extraKvs := cfg.GetStringSlice("service.extra_kvs"); len(extraKvs) > 0 {
for _, x := range extraKvs {
if kk, e := kv.SetupKV(cfg.Sub("kv" + "_" + x)); nil != e {
log.Errorf("Make KVStore(%s) failed:> %s\n", x, e)
return e
} else {
svr.kvstore[x] = kk
}
}
}
if m, e := msq.SetupPublisher(cfg.Sub("publish")); nil != e {
log.Errorln("Make MessagePub failed:> ", e)
return e
} else {
svr.mpub = m
}
if m, e := msq.SetupSubscriber(cfg.Sub("subscribe")); nil != e {
log.Errorln("Make MessageSub failed:> ", e)
return e
} else {
svr.msub = m
}
//svr.taskChans = make(chan restlet.TaskObject, 1024)
if cfg.GetBool("service.debug") {
restlet.SetDebug(true)
log.Debugln("Restlet debug on!")
}
//svr.taskHandlers = make(map[string]restlet.TaskletHandler)
//svr.msgHandlers = make(map[string]restlet.TaskletHandler)
//svr.loadBuiltinHandlers()
return nil
}
func (svr *KeplerService) Router() *mux.Router {
return svr.router
}
func (svr *KeplerService) Initialize() error {
log.Infoln("Initializing ...")
//if _, e := self.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", self.db_name)); nil != e {
// log.Errorf("Create DATABASE '%s' failed:> %s", self.db_name, e)
// return e
//}
for _, x := range svr.initialHandlers {
log.Infoln("Initializing ", x.Name, "...")
e := x.Handler.Handle(NewTaskContext(svr))
if nil != e {
log.Errorln("Failed ...", e)
return e
}
}
return nil
}
//var sessionKeeper func(ctx restlet.Context, next http.Handler) http.Handler
//
//func SetSessionKeeper(ff func(ctx restlet.Context, next http.Handler) http.Handler) {
// sessionKeeper = ff
//}
func (svr *KeplerService) SetSessionKeeper(ff func(next restlet.RequestHandler) restlet.RequestHandler) {
svr.sessionKeeper = ff
}
func (svr *KeplerService) NewContext(request *http.Request) restlet.RequestContext {
return NewRequestContext(svr, request)
}
//func (svr *KeplerService) makeRestletHandler(h restlet.RestletHandler, predictor restlet.RequestPredictor, methods []string, cache *restlet.CacheController) http.Handler {
// return restlet.MakeRestletHandler(h, predictor, svr, methods, cache)
//}
//
//func (svr *KeplerService) registerRestlet(prefix string, predictor restlet.RequestPredictor, methods []string, handler restlet.RestletHandler, cache *restlet.CacheController) {
// h := svr.makeRestletHandler(handler, predictor, methods, cache)
// log.Debugln(">>", svr.prefix, prefix)
// if svr.prefix == "" {
// log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), prefix)
// svr.router.Handle(prefix, h)
// } else {
// log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), svr.prefix+prefix)
// svr.router.Handle(svr.prefix+prefix, h)
// }
//}
//func (svr *KeplerService) TaskProc() {
// for t := range svr.taskChans {
// log.Debugln("KeplerService:>>>>>>>> Task Chan:", len(svr.taskChans))
// if h, ok := svr.taskHandlers[t.Queue]; ok {
// log.Debugln("KeplerService:>>> Task:", t.Queue)
// if e := h.Handle(NewTaskContext(svr), t.Params...); nil != e {
// log.Errorf("TaskProc:> %s Handler failed:> %s \n", t.Queue, e)
// }
// } else {
// log.Warnln("TaskProc:> Missing task handler for:", t.Queue)
// }
// }
//}
func (svr *KeplerService) messageProc() {
wg := &sync.WaitGroup{}
for _, h := range svr.messageHandlers {
wg.Add(1)
//msgKey := k
//msgHandle := h
err := svr.msub.Subscribe(h.Topic, func(data []byte) error {
log.Debugln("messageProc:> get msg via:", h.Topic)
if e := h.Handler.Handle(NewTaskContext(svr), data); nil != e {
log.Errorln("messageProc:> Call Handler failed:", e)
return e
}
return nil
})
if nil != err {
log.Errorln("messageProc:> Subscribe failed:", err)
return
}
}
wg.Wait()
}
func (svr *KeplerService) runRequestHandler(handler restlet.RequestHandler, predictor restlet.RequestPredictor, methods string) http.Handler {
var f = func(w http.ResponseWriter, r *http.Request) {
if !validateMethod(r, strings.Split(methods, ",")...) {
handleFailure(w, r, http.StatusMethodNotAllowed, "method not allowed")
}
ctx := svr.NewContext(r)
if predictor != nil {
if s, b := predictor.Predicate(ctx); !b {
handleFailure(w, r, http.StatusForbidden, s)
}
}
if e := handler.Handle(ctx, w, r); nil != e {
}
}
return http.HandlerFunc(f)
}
func (svr *KeplerService) runTaskHandler(handler restlet.TaskletHandler) func() {
var f = func() {
if e := handler.Handle(NewTaskContext(svr)); nil != e {
log.Errorln("Run cron task failed:>", handler, e)
} else {
//log.Infoln("Run cron task OK.", handler)
}
}
return f
}
func (svr *KeplerService) Serve() error {
//go svr.TaskProc()
go svr.messageProc()
var pCron *cron.Cron
if len(svr.scheduleHandlers) > 0 {
pCron = cron.New()
for _, x := range svr.scheduleHandlers {
log.Infoln("Adding cron task:", x.Schedule, x.Handler.Handle)
if e := pCron.AddFunc(x.Schedule, svr.runTaskHandler(x.Handler)); nil != e {
return e
}
}
pCron.Start()
}
defer func() {
if pCron != nil {
pCron.Stop()
}
}()
for _, h := range svr.requestHandlers {
fullPrefix := path.Join(svr.prefix, h.Prefix)
log.Infoln("Routing :", fullPrefix, h.Methods)
if svr.sessionKeeper != nil {
svr.router.Handle(fullPrefix, svr.runRequestHandler(svr.sessionKeeper(h.Handler), h.Predictor, h.Methods))
} else {
svr.router.Handle(fullPrefix, svr.runRequestHandler(h.Handler, h.Predictor, h.Methods))
}
}
listenAddr := svr.config.GetString("service.listen", ":8080")
logRequestFile := svr.config.GetString("service.log_request_file", "")
log.Infoln("Service:> Listening on ", listenAddr)
if svr.config.GetBool("service.log_request") {
var logIO io.Writer
if logRequestFile == "" {
logIO = os.Stdout
} else {
if f, e := os.OpenFile(svr.config.GetString("service.log_request_file", ""), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0640); nil != e {
logIO = os.Stdout
} else {
logIO = f
defer f.Close()
}
}
return http.ListenAndServe(listenAddr, handlers.CombinedLoggingHandler(logIO, svr.router))
} else {
return http.ListenAndServe(listenAddr, svr.router)
}
}
/**
RegisterInitialize: Register initialization calls for modules
All registered handlers will be called when svr.Initialize was called
*/
func (svr *KeplerService) RegisterInitialize(name string, handler restlet.TaskletHandler) {
svr.initialHandlers = append(svr.initialHandlers, &InitializeHandler{
Name: name,
Handler: handler,
})
}
/**
RegisterRequest: Register HTTP requests, including:
- http.Handler
- restlet.RestHandler
- restlet.RawHandler
- restlet.RestHandleFunc
- restlet.RawHandleFunc
- http.HandleFunc
*/
func (svr *KeplerService) RegisterRequest(prefix string, handler restlet.RequestHandler, methods ...string) {
h := &RequestHandler{
Prefix: prefix,
Handler: handler,
}
if len(methods) > 0 {
h.Methods = strings.Join(methods, ",")
}
svr.requestHandlers = append(svr.requestHandlers, h)
}
/**
RegisterMessage: Register message handler with specific topic,
Including:
- TaskletHandler
- TaskletFunc
*/
func (svr *KeplerService) RegisterMessage(topic string, handler restlet.TaskletHandler) {
svr.messageHandlers = append(svr.messageHandlers, &MessageHandler{
Topic: topic,
Handler: handler,
})
}
/**
RegisterCrontab: Register scheduled tasks
Including:
- TaskletHandler
- TaskletHandleFunc
*/
func (svr *KeplerService) RegisterSchedule(schedule string, handler restlet.TaskletHandler) {
svr.scheduleHandlers = append(svr.scheduleHandlers, &ScheduleHandler{
Schedule: schedule,
Handler: handler,
})
}
var (
builtinService = &KeplerService{
router: mux.NewRouter().StrictSlash(true),
db: make(map[string]database.DBI),
cache: make(map[string]cache.Cache),
kvstore: make(map[string]kv.KV),
}
)
func Router() *mux.Router {
return builtinService.router
}
func Serve() error {
return builtinService.Serve()
}
func Config(cfg config.Config, prefixes ...string) error {
return builtinService.Config(cfg, prefixes...)
}
func Initialize() error {
return builtinService.Initialize()
}
func SetSessionKeeper(ff func(next restlet.RequestHandler) restlet.RequestHandler) {
builtinService.SetSessionKeeper(ff)
}
func RegisterInitialize(name string, handler restlet.TaskletHandler) {
builtinService.RegisterInitialize(name, handler)
}
func RegisterRequest(prefix string, handler restlet.RequestHandler, methods ...string) {
builtinService.RegisterRequest(prefix, handler, methods...)
}
func RegisterMessage(topic string, handler restlet.TaskletHandler) {
builtinService.RegisterMessage(topic, handler)
}
func RegisterSchedule(schedule string, handler restlet.TaskletHandler) {
builtinService.RegisterSchedule(schedule, handler)
}