|
|
@ -5,6 +5,7 @@ import ( |
|
|
|
"strings" |
|
|
|
"errors" |
|
|
|
log "github.com/Sirupsen/logrus" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
type nsqProducer struct { |
|
|
@ -85,11 +86,19 @@ func (self *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) e |
|
|
|
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { |
|
|
|
log.Debugln("nsqConsumer.Subscribe:> Get message:>", ) |
|
|
|
//defer message.Finish()
|
|
|
|
ticker := time.NewTicker(5 * time.Second) |
|
|
|
go func() { |
|
|
|
for t := range ticker.C { |
|
|
|
message.Touch() |
|
|
|
log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t) |
|
|
|
} |
|
|
|
}() |
|
|
|
e := hf(message.Body) |
|
|
|
if nil != e { |
|
|
|
//message.Requeue()
|
|
|
|
log.Errorf("nsqConsumer.Subscribe:> Message '%s' proc failed:> %s", self.prefix+"."+topic, e) |
|
|
|
} |
|
|
|
ticker.Stop() |
|
|
|
return e |
|
|
|
})) |
|
|
|
switch self.kind { |
|
|
|