|
@ -92,31 +92,45 @@ func (nsqc *nsqConsumer) Subscribe(topic string, hf HandleFunc, chn ...string) e |
|
|
q.SetLogger(nil, nsq.LogLevelError) |
|
|
q.SetLogger(nil, nsq.LogLevelError) |
|
|
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { |
|
|
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { |
|
|
if nsqc.debug { |
|
|
if nsqc.debug { |
|
|
log.Debugln("nsqConsumer.Subscribe:> Get message:>", ) |
|
|
|
|
|
|
|
|
log.Debugln("nsqConsumer.Subscribe:> Get message:>") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//defer message.Finish()
|
|
|
//defer message.Finish()
|
|
|
//var counter = 0
|
|
|
//var counter = 0
|
|
|
|
|
|
stopChan := make(chan bool) |
|
|
ticker := time.NewTicker(5 * time.Second) |
|
|
ticker := time.NewTicker(5 * time.Second) |
|
|
go func() { |
|
|
|
|
|
for t := range ticker.C { |
|
|
|
|
|
message.Touch() |
|
|
|
|
|
//counter += 1
|
|
|
|
|
|
if nsqc.debug { |
|
|
|
|
|
log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t) |
|
|
|
|
|
|
|
|
go func(tk *time.Ticker) { |
|
|
|
|
|
defer tk.Stop() |
|
|
|
|
|
for { |
|
|
|
|
|
select { |
|
|
|
|
|
case t := <-tk.C: |
|
|
|
|
|
message.Touch() |
|
|
|
|
|
if nsqc.debug { |
|
|
|
|
|
log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t) |
|
|
|
|
|
} |
|
|
|
|
|
case <-stopChan: |
|
|
|
|
|
return |
|
|
} |
|
|
} |
|
|
//if counter*5 >= nsqc.timeout {
|
|
|
|
|
|
// message.Finish()
|
|
|
|
|
|
// ticker.Stop()
|
|
|
|
|
|
//}
|
|
|
|
|
|
} |
|
|
} |
|
|
}() |
|
|
|
|
|
|
|
|
//for t := range ticker.C {
|
|
|
|
|
|
// message.Touch()
|
|
|
|
|
|
// //counter += 1
|
|
|
|
|
|
// if nsqc.debug {
|
|
|
|
|
|
// log.Debugln("nsqConsumer.Subscribe:> Message Touched:>", t)
|
|
|
|
|
|
// }
|
|
|
|
|
|
// //if counter*5 >= nsqc.timeout {
|
|
|
|
|
|
// // message.Finish()
|
|
|
|
|
|
// // ticker.Stop()
|
|
|
|
|
|
// //}
|
|
|
|
|
|
//}
|
|
|
|
|
|
}(ticker) |
|
|
e := hf(message.Body) |
|
|
e := hf(message.Body) |
|
|
if nil != e { |
|
|
if nil != e { |
|
|
//message.Requeue()
|
|
|
//message.Requeue()
|
|
|
log.Errorf("nsqConsumer.Subscribe:> Message '%s' proc failed:> %s", nsqc.prefix+"."+topic, e) |
|
|
log.Errorf("nsqConsumer.Subscribe:> Message '%s' proc failed:> %s", nsqc.prefix+"."+topic, e) |
|
|
} |
|
|
} |
|
|
ticker.Stop() |
|
|
|
|
|
|
|
|
//ticker.Stop()
|
|
|
|
|
|
stopChan <- true |
|
|
return e |
|
|
return e |
|
|
})) |
|
|
})) |
|
|
switch nsqc.kind { |
|
|
switch nsqc.kind { |
|
|