Browse Source

Updated.

v0.7
Mingcai SHEN 5 years ago
parent
commit
5bfe73ca53
13 changed files with 148 additions and 218 deletions
  1. +2
    -2
      kv/kv.go
  2. +14
    -14
      kv/mem.go
  3. +27
    -27
      kv/redis.go
  4. +27
    -27
      kv/sentinel.go
  5. +1
    -1
      kv/types.go
  6. +1
    -27
      list/list.go
  7. +13
    -13
      list/redis.go
  8. +8
    -8
      logging/logging.go
  9. +10
    -10
      msq/msq.go
  10. +21
    -21
      msq/nsq.go
  11. +24
    -24
      msq/redis.go
  12. +0
    -9
      service/context.go
  13. +0
    -35
      service/service.go

+ 2
- 2
kv/kv.go View File

@ -11,13 +11,13 @@ import (
func SetupKV(cfg config.Config) (KV, error) {
switch cfg.GetString("driver") {
case "redis":
return new_redis_kv(
return newRedisKv(
cfg.GetString("host", "redis"),
cfg.GetUInt16("port", 6379),
cfg.GetInt64("db", 0),
cfg.GetString("password"))
case "sentinel":
return new_sentinel_kv(
return newSentinelKv(
cfg.GetString("host", "sentinel"),
cfg.GetUInt16("port", 6379),
cfg.GetString("master", "redis"),

+ 14
- 14
kv/mem.go View File

@ -9,25 +9,25 @@ import (
type MemKVStore struct {
}
func new_mem_kv(max string) (KV, error) {
func newMemKv(max string) (KV, error) {
rc := &MemKVStore{}
return rc, nil
}
// Get Cached bytes via k, return empty bytes and false if failed.
func (self *MemKVStore) Get(k string) ([]byte, error) {
func (mkvs *MemKVStore) Get(k string) ([]byte, error) {
return []byte(""), nil
}
// Set bytes to cache with given key
func (self *MemKVStore) Set(k string, d []byte, x ...int64) (e error) {
func (mkvs *MemKVStore) Set(k string, d []byte, x ...int64) (e error) {
return e
}
//
func (self *MemKVStore) GetJSON(k string, o interface{}) error {
bs, e := self.Get(k)
func (mkvs *MemKVStore) GetJSON(k string, o interface{}) error {
bs, e := mkvs.Get(k)
if nil != e {
return e
}
@ -36,18 +36,18 @@ func (self *MemKVStore) GetJSON(k string, o interface{}) error {
}
//
func (self *MemKVStore) SetJSON(k string, v interface{}, x ...int64) error {
func (mkvs *MemKVStore) SetJSON(k string, v interface{}, x ...int64) error {
bs, e := json.Marshal(v)
if nil != e {
return e
}
e = self.Set(k, bs, x...)
e = mkvs.Set(k, bs, x...)
return e
}
//
func (self *MemKVStore) GetBSON(k string, o interface{}) error {
bs, e := self.Get(k)
func (mkvs *MemKVStore) GetBSON(k string, o interface{}) error {
bs, e := mkvs.Get(k)
if nil != e {
return e
}
@ -56,27 +56,27 @@ func (self *MemKVStore) GetBSON(k string, o interface{}) error {
}
//
func (self *MemKVStore) SetBSON(k string, v interface{}, x ...int64) error {
func (mkvs *MemKVStore) SetBSON(k string, v interface{}, x ...int64) error {
bs, e := bson.Marshal(v)
if nil != e {
return e
}
e = self.Set(k, bs, x...)
e = mkvs.Set(k, bs, x...)
return e
}
//
func (self *MemKVStore) Del(k ...string) error {
func (mkvs *MemKVStore) Del(k ...string) error {
return nil
}
//
func (self *MemKVStore) Keys(p string) ([]string, error) {
func (mkvs *MemKVStore) Keys(p string) ([]string, error) {
return nil, nil
}
//
func (self *MemKVStore) Expire(k string, x ...int64) error {
func (mkvs *MemKVStore) Expire(k string, x ...int64) error {
return nil
}

+ 27
- 27
kv/redis.go View File

@ -14,7 +14,7 @@ type redisKvStore struct {
redis *redis.Client
}
func new_redis_kv(host string, port uint16, db int64, password string) (KV, error) {
func newRedisKv(host string, port uint16, db int64, password string) (KV, error) {
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
DB: int(db),
@ -42,13 +42,13 @@ func new_redis_kv(host string, port uint16, db int64, password string) (KV, erro
}
// Get Cached bytes via k, return empty bytes and false if failed.
func (self *redisKvStore) Get(k string) (bs []byte, e error) {
if self.verbose {
func (rkvs *redisKvStore) Get(k string) (bs []byte, e error) {
if rkvs.verbose {
t1 := time.Now()
defer log_timing(t1, "KV KV GET", k, e)
}
var s string
s, e = self.redis.Get(k).Result()
s, e = rkvs.redis.Get(k).Result()
if nil != e {
return nil, e
@ -57,27 +57,27 @@ func (self *redisKvStore) Get(k string) (bs []byte, e error) {
}
// Set bytes to cache with given key
func (self *redisKvStore) Set(k string, d []byte, x ...int64) (e error) {
if self.verbose {
func (rkvs *redisKvStore) Set(k string, d []byte, x ...int64) (e error) {
if rkvs.verbose {
t1 := time.Now()
defer log_timing(t1, "KV KV SET", k, e)
}
var exp time.Duration
if len(x) > 0 {
exp = time.Duration(x[0]) * time.Second
//_, e = self.redis.SetXX(k, d, time.Duration(x[0])*time.Second).Result() //SetEx(k, x[0], d)
//_, e = rkvs.redis.SetXX(k, d, time.Duration(x[0])*time.Second).Result() //SetEx(k, x[0], d)
} else {
//_, e = self.redis.Set(k, d).Result()
//_, e = rkvs.redis.Set(k, d).Result()
exp = 0
}
_, e = self.redis.Set(k, d, exp).Result()
_, e = rkvs.redis.Set(k, d, exp).Result()
return e
}
//
func (self *redisKvStore) GetJSON(k string, o interface{}) error {
bs, e := self.Get(k)
func (rkvs *redisKvStore) GetJSON(k string, o interface{}) error {
bs, e := rkvs.Get(k)
if nil != e {
return e
}
@ -86,18 +86,18 @@ func (self *redisKvStore) GetJSON(k string, o interface{}) error {
}
//
func (self *redisKvStore) SetJSON(k string, v interface{}, x ...int64) error {
func (rkvs *redisKvStore) SetJSON(k string, v interface{}, x ...int64) error {
bs, e := json.Marshal(v)
if nil != e {
return e
}
e = self.Set(k, bs, x...)
e = rkvs.Set(k, bs, x...)
return e
}
//
func (self *redisKvStore) GetBSON(k string, o interface{}) error {
bs, e := self.Get(k)
func (rkvs *redisKvStore) GetBSON(k string, o interface{}) error {
bs, e := rkvs.Get(k)
if nil != e {
return e
}
@ -106,37 +106,37 @@ func (self *redisKvStore) GetBSON(k string, o interface{}) error {
}
//
func (self *redisKvStore) SetBSON(k string, v interface{}, x ...int64) error {
func (rkvs *redisKvStore) SetBSON(k string, v interface{}, x ...int64) error {
bs, e := bson.Marshal(v)
if nil != e {
return e
}
e = self.Set(k, bs, x...)
e = rkvs.Set(k, bs, x...)
return e
}
//
func (self *redisKvStore) Del(k ...string) error {
if self.verbose {
func (rkvs *redisKvStore) Del(k ...string) error {
if rkvs.verbose {
t1 := time.Now()
defer log_timing(t1, "KV KV DEL", k)
}
_, e := self.redis.Del(k...).Result()
_, e := rkvs.redis.Del(k...).Result()
return e
}
//
func (self *redisKvStore) Keys(p string) ([]string, error) {
if self.verbose {
func (rkvs *redisKvStore) Keys(p string) ([]string, error) {
if rkvs.verbose {
t1 := time.Now()
defer log_timing(t1, "KV KV KEYS", p)
}
return self.redis.Keys(p).Result()
return rkvs.redis.Keys(p).Result()
}
//
func (self *redisKvStore) Expire(k string, x ...int64) error {
if self.verbose {
func (rkvs *redisKvStore) Expire(k string, x ...int64) error {
if rkvs.verbose {
t1 := time.Now()
defer log_timing(t1, "KV KV EXPIRE", k)
}
@ -144,8 +144,8 @@ func (self *redisKvStore) Expire(k string, x ...int64) error {
if len(x) > 0 {
exp = x[0]
} else {
exp = DEFAULT_EXPIRE_VAL
exp = DefaultExpireVal
}
_, e := self.redis.Expire(k, time.Duration(exp)*time.Second).Result()
_, e := rkvs.redis.Expire(k, time.Duration(exp)*time.Second).Result()
return e
}

+ 27
- 27
kv/sentinel.go View File

@ -14,7 +14,7 @@ type sentinelKVStore struct {
redis *redis.Client
}
func new_sentinel_kv(host string, port uint16, master_name string, db int64, password string) (KV, error) {
func newSentinelKv(host string, port uint16, master_name string, db int64, password string) (KV, error) {
client := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: master_name,
SentinelAddrs: []string{fmt.Sprintf("%s:%d", host, port)},
@ -30,13 +30,13 @@ func new_sentinel_kv(host string, port uint16, master_name string, db int64, pas
}
// Get Cached bytes via k, return empty bytes and false if failed.
func (self *sentinelKVStore) Get(k string) (bs []byte, e error) {
if self.verbose {
func (skvs *sentinelKVStore) Get(k string) (bs []byte, e error) {
if skvs.verbose {
t1 := time.Now()
defer log_timing(t1, "Sentinel KV GET", k, e)
}
var s string
s, e = self.redis.Get(k).Result()
s, e = skvs.redis.Get(k).Result()
if nil != e {
return nil, e
@ -45,8 +45,8 @@ func (self *sentinelKVStore) Get(k string) (bs []byte, e error) {
}
// Set bytes to cache with given key
func (self *sentinelKVStore) Set(k string, d []byte, x ...int64) (e error) {
if self.verbose {
func (skvs *sentinelKVStore) Set(k string, d []byte, x ...int64) (e error) {
if skvs.verbose {
t1 := time.Now()
defer log_timing(t1, "Sentinel KV SET", k, e)
}
@ -54,19 +54,19 @@ func (self *sentinelKVStore) Set(k string, d []byte, x ...int64) (e error) {
var exp time.Duration
if len(x) > 0 {
exp = time.Duration(x[0]) * time.Second
//_, e = self.redis.SetXX(k, d, time.Duration(x[0])*time.Second).Result() //SetEx(k, x[0], d)
//_, e = skvs.redis.SetXX(k, d, time.Duration(x[0])*time.Second).Result() //SetEx(k, x[0], d)
} else {
//_, e = self.redis.Set(k, d).Result()
//_, e = skvs.redis.Set(k, d).Result()
exp = 0
}
_, e = self.redis.Set(k, d, exp).Result()
_, e = skvs.redis.Set(k, d, exp).Result()
return e
}
//
func (self *sentinelKVStore) GetJSON(k string, o interface{}) error {
bs, e := self.Get(k)
func (skvs *sentinelKVStore) GetJSON(k string, o interface{}) error {
bs, e := skvs.Get(k)
if nil != e {
return e
}
@ -75,18 +75,18 @@ func (self *sentinelKVStore) GetJSON(k string, o interface{}) error {
}
//
func (self *sentinelKVStore) SetJSON(k string, v interface{}, x ...int64) error {
func (skvs *sentinelKVStore) SetJSON(k string, v interface{}, x ...int64) error {
bs, e := json.Marshal(v)
if nil != e {
return e
}
e = self.Set(k, bs, x...)
e = skvs.Set(k, bs, x...)
return e
}
//
func (self *sentinelKVStore) GetBSON(k string, o interface{}) error {
bs, e := self.Get(k)
func (skvs *sentinelKVStore) GetBSON(k string, o interface{}) error {
bs, e := skvs.Get(k)
if nil != e {
return e
}
@ -95,38 +95,38 @@ func (self *sentinelKVStore) GetBSON(k string, o interface{}) error {
}
//
func (self *sentinelKVStore) SetBSON(k string, v interface{}, x ...int64) error {
func (skvs *sentinelKVStore) SetBSON(k string, v interface{}, x ...int64) error {
bs, e := bson.Marshal(v)
if nil != e {
return e
}
e = self.Set(k, bs, x...)
e = skvs.Set(k, bs, x...)
return e
}
//
func (self *sentinelKVStore) Del(k ...string) error {
if self.verbose {
func (skvs *sentinelKVStore) Del(k ...string) error {
if skvs.verbose {
t1 := time.Now()
defer log_timing(t1, "Sentinel KV DEL", k)
}
_, e := self.redis.Del(k...).Result()
_, e := skvs.redis.Del(k...).Result()
return e
}
//
func (self *sentinelKVStore) Keys(p string) ([]string, error) {
if self.verbose {
func (skvs *sentinelKVStore) Keys(p string) ([]string, error) {
if skvs.verbose {
t1 := time.Now()
defer log_timing(t1, "Sentinel KV KEYS", p)
}
return self.redis.Keys(p).Result()
return skvs.redis.Keys(p).Result()
}
//
func (self *sentinelKVStore) Expire(k string, x ...int64) error {
if self.verbose {
func (skvs *sentinelKVStore) Expire(k string, x ...int64) error {
if skvs.verbose {
t1 := time.Now()
defer log_timing(t1, "Sentinel KV EXPIRE", k)
}
@ -135,8 +135,8 @@ func (self *sentinelKVStore) Expire(k string, x ...int64) error {
if len(x) > 0 {
exp = x[0]
} else {
exp = DEFAULT_EXPIRE_VAL
exp = DefaultExpireVal
}
_, e := self.redis.Expire(k, time.Duration(exp)*time.Second).Result()
_, e := skvs.redis.Expire(k, time.Duration(exp)*time.Second).Result()
return e
}

+ 1
- 1
kv/types.go View File

@ -1,6 +1,6 @@
package kv
const DEFAULT_EXPIRE_VAL int64 = 1800
const DefaultExpireVal int64 = 1800
type KV interface {
Keys(p string) ([]string, error)

+ 1
- 27
list/list.go View File

@ -9,7 +9,7 @@ import (
func SetupList(cfg config.Config) (List, error) {
switch cfg.GetString("driver") {
case "redis":
return new_redis_list(
return newRedisList(
cfg.GetString("host", "redis"),
cfg.GetUInt16("port", 6379),
cfg.GetInt64("db", 0),
@ -26,29 +26,3 @@ func SetupList(cfg config.Config) (List, error) {
}
return nil, nil
}
// ds : dataSource: schema://username:password@host:port/db?options
//func NewList(ds string) (List, error) {
// urx, e := misc.ParseURL(ds)
// if nil != e {
// return nil, e
// }
// switch urx.Scheme(){
// case "redis":
// db, _ := urx.Parameters().GetInt64("db", 0)
// return new_redis_list(urx.Host("redis"), urx.Port(6379), db, urx.Password())
// //case "sentinel":
// // db, _ := urx.Parameters().GetInt64("db", 0)
// // enabled, _ := urx.Parameters().GetBool("enabled", true)
// // master, _ := urx.Parameters().GetString("master", "redis")
// // return new_sentinel_cache(urx.Host("sentinel"), urx.Port(6379), master, db, urx.Password(), enabled)
// //case "mem":
// // max,_ := urx.Parameters().GetString("max", "32m")
// // return new_mem_cache(max)
// default:
// return nil, errors.New("Unsupported engine type: " + urx.Scheme())
//
// }
// return nil, nil
//}

+ 13
- 13
list/redis.go View File

@ -17,7 +17,7 @@ type redisBSONList struct {
redisSimpleList
}
func new_redis_list(host string, port uint16, db int64, password string) (List, error) {
func newRedisList(host string, port uint16, db int64, password string) (List, error) {
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
DB: int(db),
@ -31,31 +31,31 @@ func new_redis_list(host string, port uint16, db int64, password string) (List,
return rc, nil
}
func (self *redisSimpleList) LPush(k string, data interface{}) error {
_, e := self.redis.LPush(k, data).Result()
func (rsl *redisSimpleList) LPush(k string, data interface{}) error {
_, e := rsl.redis.LPush(k, data).Result()
return e
}
func (self *redisSimpleList) RPush(k string, data interface{}) error {
_, e := self.redis.RPush(k, data).Result()
func (rsl *redisSimpleList) RPush(k string, data interface{}) error {
_, e := rsl.redis.RPush(k, data).Result()
return e
}
func (self *redisSimpleList) LPop(k string) ([]byte, error) {
if s, e := self.redis.LPop(k).Result(); nil != e {
func (rsl *redisSimpleList) LPop(k string) ([]byte, error) {
if s, e := rsl.redis.LPop(k).Result(); nil != e {
return nil, e
} else {
return []byte(s), nil
}
}
func (self *redisSimpleList) RPop(k string) ([]byte, error) {
if s, e := self.redis.RPop(k).Result(); nil != e {
func (rsl *redisSimpleList) RPop(k string) ([]byte, error) {
if s, e := rsl.redis.RPop(k).Result(); nil != e {
return nil, e
} else {
return []byte(s), nil
}
}
func (self *redisSimpleList) LRange(k string, start int64, end int64) ([][]byte, error) {
if ss, e := self.redis.LRange(k, start, end).Result(); nil != e {
func (rsl *redisSimpleList) LRange(k string, start int64, end int64) ([][]byte, error) {
if ss, e := rsl.redis.LRange(k, start, end).Result(); nil != e {
return nil, e
} else {
var bbs [][]byte
@ -65,7 +65,7 @@ func (self *redisSimpleList) LRange(k string, start int64, end int64) ([][]byte,
return bbs, nil
}
}
func (self *redisSimpleList) LTrim(k string, start int64, end int64) error {
_, e := self.redis.LTrim(k, start, end).Result()
func (rsl *redisSimpleList) LTrim(k string, start int64, end int64) error {
_, e := rsl.redis.LTrim(k, start, end).Result()
return e
}

+ 8
- 8
logging/logging.go View File

@ -11,14 +11,14 @@ import (
"cygnux.net/kepler/config"
)
const DEFAULT_FORMAT = "TEXT"
const DefaultFormat = "TEXT"
var OUTPUT_FILE *os.File
var OutputFile *os.File
type PlainFormatter struct {
}
func (self *PlainFormatter) Format(entry *log.Entry) ([]byte, error) {
func (pf *PlainFormatter) Format(entry *log.Entry) ([]byte, error) {
bytes := []byte(fmt.Sprintf("[%s %s] %s\n", entry.Time.Format(time.RFC3339), strings.ToUpper(entry.Level.String()), entry.Message))
return bytes, nil
}
@ -36,15 +36,15 @@ func InitializeLogging(cfg config.Config, useStd bool, level ...string) error {
if useStd || cfg.GetString("filename") == "" {
log.SetOutput(os.Stdout)
OUTPUT_FILE = nil
OutputFile = nil
} else {
f, e := os.OpenFile(cfg.GetString("filename"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if nil != e {
fmt.Errorf("Open file <%s> for logging failed<%v>!\n", cfg.GetString("filename"), e)
_ = fmt.Errorf("Open file <%s> for logging failed<%v>!\n", cfg.GetString("filename"), e)
return e
} else {
log.SetOutput(f)
OUTPUT_FILE = f
OutputFile = f
}
}
if strings.ToLower(cfg.GetString("format")) == "json" {
@ -58,7 +58,7 @@ func InitializeLogging(cfg config.Config, useStd bool, level ...string) error {
}
func DeinitializeLogging() {
if nil != OUTPUT_FILE {
OUTPUT_FILE.Close()
if nil != OutputFile {
_ = OutputFile.Close()
}
}

+ 10
- 10
msq/msq.go View File

@ -9,11 +9,11 @@ import (
func SetupPublisher(cfg config.Config) (Publisher, error) {
switch cfg.GetString("driver") {
case "nsq":
return new_nsq_producer(
return newNsqProducer(
cfg.GetString("address", "nsqd:4150"),
cfg.GetString("prefix", "default"))
case "redis":
return new_redis_producer(
return newRedisProducer(
cfg.GetString("host", "redis"),
cfg.GetUInt16("port", 6379),
cfg.GetInt64("db", 0),
@ -29,28 +29,28 @@ func SetupPublisher(cfg config.Config) (Publisher, error) {
func SetupSubscriber(cfg config.Config) (Subscriber, error) {
switch cfg.GetString("driver") {
case "nsq":
return new_nsq_consumer("nsqd",
return newNsqConsumer("nsqd",
cfg.GetString("address", "nsqd:4150"),
cfg.GetString("prefix", "default"),
cfg.GetString("channel", "default"),
cfg.GetInt("timeout", 180),
)
case "nsqlookup":
return new_nsq_consumer("nsqlookupd",
return newNsqConsumer("nsqlookupd",
cfg.GetString("address", "nsqlookupd:4160"),
cfg.GetString("prefix", "default"),
cfg.GetString("channel", "default"),
cfg.GetInt("timeout", 180),
)
case "redis":
return new_redis_consumer(
return newRedisConsumer(
cfg.GetString("host", "redis"),
cfg.GetUInt16("port", 6379),
cfg.GetInt64("db", 0),
cfg.GetString("password"),
cfg.GetString("prefix"),
cfg.GetString("failure_suffix", "__failed"),
cfg.GetString("running_suffix", "__running"),
cfg.GetString("failureSuffix", "__failed"),
cfg.GetString("runningSuffix", "__running"),
cfg.GetBool("verbose"))
default:
return nil, errors.New("Not supported: " + cfg.GetString("driver"))
@ -70,7 +70,7 @@ func NewPublisher(s string) (Publisher, error) {
if p := u.Query().Get("prefix"); p != "" {
prefix = p
}
return new_nsq_producer(u.Host, prefix)
return newNsqProducer(u.Host, prefix)
default:
return nil, errors.New("Not supported: " + u.Scheme)
}
@ -96,7 +96,7 @@ func NewSubscriber(s string) (Subscriber, error) {
if p := u.Query().Get("channel"); p != "" {
channel = p
}
return new_nsq_consumer(kind, u.Host, prefix, channel)
return newNsqConsumer(kind, u.Host, prefix, channel)
case "nsqlookup":
prefix := "default"
if p := u.Query().Get("prefix"); p != "" {
@ -106,7 +106,7 @@ func NewSubscriber(s string) (Subscriber, error) {
if p := u.Query().Get("channel"); p != "" {
channel = p
}
return new_nsq_consumer("nsqlookupd", u.Host, prefix, channel)
return newNsqConsumer("nsqlookupd", u.Host, prefix, channel)
default:
return nil, errors.New("Not supported: " + u.Scheme)
}

+ 21
- 21
msq/nsq.go View File

@ -24,8 +24,8 @@ func (l logger) Output(calldepth int, s string) error {
return nil
}
func new_nsq_producer(address string, prefix string) (Publisher, error) {
log.Infoln("new_nsq_producer: ", address, prefix)
func newNsqProducer(address string, prefix string) (Publisher, error) {
log.Infoln("newNsqProducer: ", address, prefix)
config := nsq.NewConfig()
w, e := nsq.NewProducer(address, config)
if nil != e {
@ -35,15 +35,15 @@ func new_nsq_producer(address string, prefix string) (Publisher, error) {
return producer, nil
}
func (self *nsqProducer) Publish(topic string, data []byte) error {
if self.debug {
func (nsqp *nsqProducer) Publish(topic string, data []byte) error {
if nsqp.debug {
log.Debugln("nsqProducer.Publish:> ", topic)
}
return self.p.Publish(self.prefix+"."+topic, data)
return nsqp.p.Publish(nsqp.prefix+"."+topic, data)
}
func (self *nsqProducer) Close() {
self.p.Stop()
func (nsqp *nsqProducer) Close() {
nsqp.p.Stop()
}
const (
@ -61,8 +61,8 @@ type nsqConsumer struct {
debug bool
}
func new_nsq_consumer(kind string, address string, prefix string, channel string, timeout int) (Subscriber, error) {
log.Infoln("new_nsq_consumer:> ", kind, address, prefix, channel)
func newNsqConsumer(kind string, address string, prefix string, channel string, timeout int) (Subscriber, error) {
log.Infoln("newNsqConsumer:> ", kind, address, prefix, channel)
var k uint8
switch strings.ToLower(kind) {
case "nsqd":
@ -78,20 +78,20 @@ func new_nsq_consumer(kind string, address string, prefix string, channel string
return consumer, nil
}
func (self *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) error {
func (nsqc *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) error {
log.Infoln("nsqConsumer.Subscribe:>", topic)
channel := self.channel
channel := nsqc.channel
if len(chn) > 0 && chn[0] != "" {
channel = chn[0]
}
self.config.MsgTimeout = time.Duration(self.timeout) * time.Second
q, e := nsq.NewConsumer(self.prefix+"."+topic, channel, self.config)
nsqc.config.MsgTimeout = time.Duration(nsqc.timeout) * time.Second
q, e := nsq.NewConsumer(nsqc.prefix+"."+topic, channel, nsqc.config)
if nil != e {
return e
}
q.SetLogger(nil, nsq.LogLevelError)
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
if self.debug {
if nsqc.debug {
log.Debugln("nsqConsumer.Subscribe:> Get message:>", )
}
@ -102,10 +102,10 @@ func (self *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) e
for t := range ticker.C {
message.Touch()
//counter += 1
if self.debug {
if nsqc.debug {
log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t)
}
//if counter*5 >= self.timeout {
//if counter*5 >= nsqc.timeout {
// message.Finish()
// ticker.Stop()
//}
@ -114,21 +114,21 @@ func (self *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) e
e := hf(message.Body)
if nil != e {
//message.Requeue()
log.Errorf("nsqConsumer.Subscribe:> Message '%s' proc failed:> %s", self.prefix+"."+topic, e)
log.Errorf("nsqConsumer.Subscribe:> Message '%s' proc failed:> %s", nsqc.prefix+"."+topic, e)
}
ticker.Stop()
return e
}))
switch self.kind {
switch nsqc.kind {
case NSQ_NSQD:
return q.ConnectToNSQD(self.address)
return q.ConnectToNSQD(nsqc.address)
case NSQ_NSQLOOKUPD:
return q.ConnectToNSQLookupd(self.address)
return q.ConnectToNSQLookupd(nsqc.address)
default:
return errors.New("Invalid kind.")
}
}
func (self *nsqConsumer) Close() {
func (nsqc *nsqConsumer) Close() {
}

+ 24
- 24
msq/redis.go View File

@ -15,14 +15,14 @@ type redisProducer struct {
}
type redisConsumer struct {
verbose bool
prefix string
failure_suffix string
running_suffix string
redis *redis.Client
verbose bool
prefix string
failureSuffix string
runningSuffix string
redis *redis.Client
}
func new_redis_producer(host string, port uint16, db int64, password string, prefix string, verbose bool) (Publisher, error) {
func newRedisProducer(host string, port uint16, db int64, password string, prefix string, verbose bool) (Publisher, error) {
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
DB: int(db),
@ -36,20 +36,20 @@ func new_redis_producer(host string, port uint16, db int64, password string, pre
return rc, nil
}
func (self *redisProducer) Publish(topic string, data []byte) error {
if self.verbose {
func (rp *redisProducer) Publish(topic string, data []byte) error {
if rp.verbose {
log.Debugln("redisProducer.Publish:> ", topic)
}
_, e := self.redis.RPush(self.prefix+"."+topic, data).Result()
_, e := rp.redis.RPush(rp.prefix+"."+topic, data).Result()
return e
}
func (self *redisProducer) Close() {
self.redis.Close()
func (rp *redisProducer) Close() {
_ = rp.redis.Close()
}
func new_redis_consumer(host string, port uint16, db int64, password string, prefix string, failure_sfx, running_suffix string, verbose bool) (Subscriber, error) {
func newRedisConsumer(host string, port uint16, db int64, password string, prefix string, failure_sfx, running_suffix string, verbose bool) (Subscriber, error) {
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
DB: int(db),
@ -60,27 +60,27 @@ func new_redis_consumer(host string, port uint16, db int64, password string, pre
return nil, e
}
rc := &redisConsumer{
redis: client,
prefix: prefix,
failure_suffix: failure_sfx,
verbose: verbose,
running_suffix: running_suffix}
redis: client,
prefix: prefix,
failureSuffix: failure_sfx,
verbose: verbose,
runningSuffix: running_suffix}
return rc, nil
}
func (self *redisConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) error {
func (rc *redisConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) error {
log.Infoln("redisConsumer.Subscribe:>", topic)
go func() {
key := self.prefix + "." + topic
fkey := key + self.failure_suffix
key := rc.prefix + "." + topic
fkey := key + rc.failureSuffix
for {
s, e := self.redis.LPop(key).Result()
s, e := rc.redis.LPop(key).Result()
if nil != e {
time.Sleep(time.Second * 15)
} else {
bs := []byte(s)
if e := hf(bs); nil != e {
self.redis.LPush(fkey, bs)
rc.redis.LPush(fkey, bs)
} else {
}
@ -90,6 +90,6 @@ func (self *redisConsumer) Subscribe(topic string, hf HandleFunc, chn ...string)
return nil
}
func (self *redisConsumer) Close() {
self.redis.Close()
func (rc *redisConsumer) Close() {
_ = rc.redis.Close()
}

+ 0
- 9
service/context.go View File

@ -74,15 +74,6 @@ func (rc *RequestContext) Config() config.Config {
return rc._service.config
}
//func (rc *RequestContext) Chan(topic string, objs ...interface{}) error {
// if len(objs) < 1 {
// return errors.New("No Object!")
// }
// taskObj := restlet.TaskObject{Queue: topic, Params: objs}
// rc._service.taskChans <- taskObj
// return nil
//}
func (rc *RequestContext) Publish(topic string, datas ...interface{}) error {
for _, bs := range datas {
var ps []byte

+ 0
- 35
service/service.go View File

@ -206,11 +206,6 @@ func (svr *KeplerService) Initialize() error {
return nil
}
//var sessionKeeper func(ctx restlet.Context, next http.Handler) http.Handler
//
//func SetSessionKeeper(ff func(ctx restlet.Context, next http.Handler) http.Handler) {
// sessionKeeper = ff
//}
func (svr *KeplerService) SetSessionKeeper(ff func(next restlet.RequestHandler) restlet.RequestHandler) {
svr.sessionKeeper = ff
@ -220,36 +215,6 @@ func (svr *KeplerService) NewContext(request *http.Request) restlet.RequestConte
return NewRequestContext(svr, request)
}
//func (svr *KeplerService) makeRestletHandler(h restlet.RestletHandler, predictor restlet.RequestPredictor, methods []string, cache *restlet.CacheController) http.Handler {
// return restlet.MakeRestletHandler(h, predictor, svr, methods, cache)
//}
//
//func (svr *KeplerService) registerRestlet(prefix string, predictor restlet.RequestPredictor, methods []string, handler restlet.RestletHandler, cache *restlet.CacheController) {
// h := svr.makeRestletHandler(handler, predictor, methods, cache)
// log.Debugln(">>", svr.prefix, prefix)
// if svr.prefix == "" {
// log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), prefix)
// svr.router.Handle(prefix, h)
// } else {
// log.Debugf("Register:> Registering RestletHandler [%p] %s: %s ...", handler.Handle, strings.Join(methods, ","), svr.prefix+prefix)
// svr.router.Handle(svr.prefix+prefix, h)
// }
//}
//func (svr *KeplerService) TaskProc() {
// for t := range svr.taskChans {
// log.Debugln("KeplerService:>>>>>>>> Task Chan:", len(svr.taskChans))
// if h, ok := svr.taskHandlers[t.Queue]; ok {
// log.Debugln("KeplerService:>>> Task:", t.Queue)
// if e := h.Handle(NewTaskContext(svr), t.Params...); nil != e {
// log.Errorf("TaskProc:> %s Handler failed:> %s \n", t.Queue, e)
// }
// } else {
// log.Warnln("TaskProc:> Missing task handler for:", t.Queue)
// }
// }
//}
func makeMsgSubFunc(ctx restlet.TaskContext, h restlet.TaskletHandler) func([]byte) error {
var f = func(data []byte) error {
if e := h.Handle(ctx, data); nil != e {

Loading…
Cancel
Save