Compare commits

...

1 Commits

Author SHA1 Message Date
  Mingcai SHEN 12b81a1027 Merged tick save close. 4 years ago
1 changed files with 27 additions and 13 deletions
Split View
  1. +27
    -13
      msq/nsq.go

+ 27
- 13
msq/nsq.go View File

@ -92,31 +92,45 @@ func (nsqc *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) e
q.SetLogger(nil, nsq.LogLevelError)
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
if nsqc.debug {
log.Debugln("nsqConsumer.Subscribe:> Get message:>", )
log.Debugln("nsqConsumer.Subscribe:> Get message:>")
}
//defer message.Finish()
//var counter = 0
stopChan := make(chan bool)
ticker := time.NewTicker(5 * time.Second)
go func() {
for t := range ticker.C {
message.Touch()
//counter += 1
if nsqc.debug {
log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t)
go func(tk *time.Ticker) {
defer tk.Stop()
for {
select {
case t := <-tk.C:
message.Touch()
if nsqc.debug {
log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t)
}
case <-stopChan:
return
}
//if counter*5 >= nsqc.timeout {
// message.Finish()
// ticker.Stop()
//}
}
}()
//for t := range ticker.C {
// message.Touch()
// //counter += 1
// if nsqc.debug {
// log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t)
// }
// //if counter*5 >= nsqc.timeout {
// // message.Finish()
// // ticker.Stop()
// //}
//}
}(ticker)
e := hf(message.Body)
if nil != e {
//message.Requeue()
log.Errorf("nsqConsumer.Subscribe:> Message '%s' proc failed:> %s", nsqc.prefix+"."+topic, e)
}
ticker.Stop()
//ticker.Stop()
stopChan <- true
return e
}))
switch nsqc.kind {

Loading…
Cancel
Save