2016-04-20 20 views
0

rabbitmqサーバへの再起動接続をテストします。 オンテストする小さなスクリプトを書きました。 http://play.golang.org/p/l3ZWzG0Qqb しかし、動作していません。Golang amqp reconnect

ステップ10では、チャネルと接続を閉じます。そしてそれらを再び開いてください。 chan amqp.Confirmation(:75)を再作成します。そして、サイクルを続ける。 しかし、その後、陳は何も返さないことを確認します。

UPD:ここにコード。

package main 

import (
    "fmt" 
    "github.com/streadway/amqp" 
    "log" 
    "os" 
    "time" 
) 

const SERVER = "amqp://user:[email protected]:5672/" 
const EXCHANGE_NAME = "publisher.test.1" 
const EXCHANGE_TYPE = "direct" 
const ROUTING_KEY = "publisher.test" 

var Connection *amqp.Connection 
var Channel *amqp.Channel 

func setup(url string) (*amqp.Connection, *amqp.Channel, error) { 
    conn, err := amqp.Dial(url) 
    if err != nil { 
     return nil, nil, err 
    } 

    ch, err := conn.Channel() 
    if err != nil { 
     return nil, nil, err 
    } 

    return conn, ch, nil 
} 

func main() { 
    url := SERVER 

    Connection, Channel, err := setup(url) 
    if err != nil { 
     fmt.Println("err publisher setup:", err) 
     return 
    } 

    confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1)) 
    if err := Channel.Confirm(false); err != nil { 
     log.Fatalf("confirm.select destination: %s", err) 
    } 

    for i := 1; i <= 3000000; i++ { 
     log.Println(i) 

     if err != nil { 
      fmt.Println("err consume:", err) 
      return 
     } 

     if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{ 
      Body: []byte(fmt.Sprintf("%d", i)), 
     }); err != nil { 
      fmt.Println("err publish:", err) 
      log.Printf("%+v", err) 
      os.Exit(1) 
      return 
     } 

     // only ack the source delivery when the destination acks the publishing 
     confirmed := <-confirms 
     if confirmed.Ack { 
      log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag) 
     } else { 
      log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) 
      // TODO. Reconnect will be here 
     } 

     if i == 10 { 
      Channel.Close() 
      Connection.Close() 
      while := true 
      for while { 
       log.Println("while") 
       time.Sleep(time.Second * 1) 
       Connection, Channel, err = setup(url) 
       if err == nil { 
        while = false 
        confirms = Channel.NotifyPublish(make(chan amqp.Confirmation, 1)) 
        log.Printf("%+v", confirms) 
       } 
      } 
     } 
     time.Sleep(time.Millisecond * 300) 
    } 

    os.Exit(1) 
} 

答えて

0

チャネルを確認モードにする必要があります。 channel.Confirm()メソッドを呼び出して 接続を閉じた後、同じ接続で新しいチャンネルを取得した後でも、チャンネルが古いチャンネルと異なるためConfirm()メソッドを再度呼び出す必要があります。また、新しいチャンネルのデフォルトは確認を送信しません。