From 58ad5a59e7688eaebf7f3b87461ef33e84e73312 Mon Sep 17 00:00:00 2001 From: Mingcai SHEN Date: Fri, 20 Jul 2018 11:27:42 +0800 Subject: [PATCH] Add ticker to touch message prevent timeout. --- msq/nsq.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/msq/nsq.go b/msq/nsq.go index 4294f28..0ba2c9b 100644 --- a/msq/nsq.go +++ b/msq/nsq.go @@ -5,6 +5,7 @@ import ( "strings" "errors" log "github.com/Sirupsen/logrus" + "time" ) type nsqProducer struct { @@ -85,11 +86,19 @@ func (self *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) e q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { log.Debugln("nsqConsumer.Subscribe:> Get message:>", ) //defer message.Finish() + ticker := time.NewTicker(5 * time.Second) + go func() { + for t := range ticker.C { + message.Touch() + log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t) + } + }() e := hf(message.Body) if nil != e { //message.Requeue() log.Errorf("nsqConsumer.Subscribe:> Message '%s' proc failed:> %s", self.prefix+"."+topic, e) } + ticker.Stop() return e })) switch self.kind {