亚洲狠狠久久综合一区二区三区

<progress id="73rr5"></progress>
<tbody id="73rr5"><pre id="73rr5"></pre></tbody>

    <tbody id="73rr5"></tbody><dd id="73rr5"><track id="73rr5"></track></dd>
    go rabbitmq 使用教程 ,go rabbitmq 簡單隊列,go rabbitmq work模式,go rabbitmq 訂閱模式

    成人自考/成人高考/教師資格證/會計從業資格證/建造師/造價師,一個小程序就夠啦。

    使用Go的過程記錄了全部的rabbitmq的go代碼,方便自己下次Copy,go的資料比較少,seo估計很好做,流量速度過來。

    【一】.簡單隊列.生產者將消息發送到隊列,消費者從隊列中獲取消息。

    1.0.connection code

    func NewRabbitMQ() *amqp.Channel {
    	// 獲取connection
    	amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"
    	connection, err := amqp.Dial(amqUrl)
    	if err != nil {
    		panic(fmt.Sprintf("獲取connection異常:%s\n", err))
    	}
    
    	// 獲取channel
    	channel, err := connection.Channel()
    	if err != nil {
    		panic(fmt.Sprintf("獲取channel異常:%s\n", err))
    	}
    
    	return channel
    }

    1.1.client code:

    // 生產_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 生產_聲明隊列(不存在自動創建)
    queueName := "ic_order_active"
    _, err := channel.QueueDeclare(
        // 隊列名稱
        queueName,
        // 是否持久化
        false,
        // 是否自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞處理
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Printf("聲明隊列異常:%s", err)
        return
    }
    
    // 生產_發送消息到隊列
    message := "ic元器件活動來新單啦"
    err = channel.Publish(
        // 交換機
        "",
        // 隊列名稱
        queueName,
        // true->根據自身exchange類型和routeKey規則無法找到符合條件的隊列會把消息返還給發送者,false->出現上述情況會直接將消息丟棄
        false,
        // true->當exchange將消息路由到隊列后發現隊列上沒有消費者則會把消息返還給發送者,false->出現上述情況,消息一樣會發送給消息隊列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 隊列和消息同時設置持久化
            DeliveryMode: 2,
            Body:         []byte(message),
        },
    )
    if err != nil {
        fmt.Printf("發送消息到隊列異常:%s", err)
        return
    }

    1.2.service code

    // 消費_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 消費_聲明隊列
    queueName := "ic_order_active"
    _, err := channel.QueueDeclare(
        // 隊列名稱
        queueName,
        // 是否持久化
        false,
        // 是否自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞處理
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Printf("聲明隊列異常:%s", err)
        return
    }
    
    // 消費_獲取隊列中的消息
    message, err := channel.Consume(
        // 隊列名稱
        queueName,
        // 消費者名稱
        "ic訂單消費者",
        // 是否自動ack
        false,
        // 是否排他性隊列標識
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return
    }
    
    // 輸出消息
    for msg := range message {
        // 打印消息內容
        fmt.Printf("收到隊列消息%s \n", msg.Body)
        // 確認收到消息
        msg.Ack(true)
    }

    【二】.Work模式.一個生產者,多個消費者,一個消息只能被一個消費者獲取到

    2.0.client code

    // 生產_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 生產_聲明隊列(不存在自動創建)
    queueName := "ic_order_active"
    _, err := channel.QueueDeclare(
        // 隊列名稱
        queueName,
        // 是否持久化
        false,
        // 是否自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞處理
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Printf("聲明隊列異常:%s", err)
        return
    }
    
    // 生產_發送消息到隊列
    message := "ic元器件活動來新單啦,訂單id"
    messageSize := 10
    for i := 0; i < messageSize; i++ {
        // 方便觀察消費者
        time.Sleep(time.Second * 1)
        err = channel.Publish(
            // 交換機
            "",
            // 隊列名稱
            queueName,
            // true->根據自身exchange類型和routeKey規則無法找到符合條件的隊列會把消息返還給發送者,false->出現上述情況會直接將消息丟棄
            false,
            // true->當exchange將消息路由到隊列后發現隊列上沒有消費者則會把消息返還給發送者,false->出現上述情況,消息一樣會發送給消息隊列
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                // 隊列和消息同時設置持久化
                DeliveryMode: 2,
                Body:         []byte(message + strconv.Itoa(i)),
            },
        )
        if err != nil {
            fmt.Printf("發送消息到隊列異常:%s", err)
            return
        }
    }

    2.1.service code

    // 消費_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 消費_聲明隊列
    queueName := "ic_order_active"
    _, err := channel.QueueDeclare(
        // 隊列名稱
        queueName,
        // 是否持久化
        false,
        // 是否自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞處理
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Printf("聲明隊列異常:%s", err)
        return
    }
    
    // 設置同一時間服務器只會發送一條消息給消費者
    channel.Qos(
        // 每次獲取多少條
        10,
        // 預加載數量(rabbitMq不支持)
        0,
        // false->對當前隊列可用 true->對channel可用(rabbitMq不支持)
        false,
    )
    
    // 消費_獲取隊列中的消息
    message, err := channel.Consume(
        // 隊列名稱
        queueName,
        // 消費者名稱
        "ic訂單消費者",
        // 是否自動ack
        false,
        // 是否排他性隊列標識
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return
    }
    
    // 輸出消息
    for msg := range message {
        // 打印消息內容
        fmt.Printf("收到隊列消息%s \n", msg.Body)
        // 確認收到消息
        msg.Ack(true)
    }

    【三】.訂閱模式(fanout).

    一個生產者,多個消費者

    每個消費者擁有自己的隊列

    生產者將消息發送到交換機

    每個隊列自己去綁定交換機

    (交換機沒有儲存能力,發送到沒有任何隊列綁定的交換機則消息丟失)

    3.0.client code

    // 生產_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 生產_聲明交換機
    exchangeName := "notice"
    err := channel.ExchangeDeclare(
        // 交換機名稱
        exchangeName,
        // 交換機類型
        "fanout",
        // 持久化
        true,
        // true->當所有綁定都與交換器解綁后,會自動刪除此交換器
        false,
        // true->客戶端無法直接發送msg到內部交換器,只有交換器可以發送msg到內部交換器
        false,
        // 是否非阻塞
        false,
        // 其他參數
        nil,
    )
    if err != nil {
        fmt.Printf("聲明交換機異常:%s", err)
        return
    }
    
    // 生產_發送消息到交換機
    message := "最新消息,華秋全場元器件3折起"
    messageSize := 10
    for i := 0; i < messageSize; i++ {
        // 方便觀察消費者
        time.Sleep(time.Second * 1)
        err = channel.Publish(
            // 交換機
            exchangeName,
            // 路由key
            "",
            // true->根據自身exchange類型和routeKey規則無法找到符合條件的隊列會把消息返還給發送者,false->出現上述情況會直接將消息丟棄
            false,
            // true->當exchange將消息路由到隊列后發現隊列上沒有消費者則會把消息返還給發送者,false->出現上述情況,消息一樣會發送給消息隊列
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                // 隊列和消息同時設置持久化
                DeliveryMode: 2,
                Body:         []byte(message + strconv.Itoa(i)),
            },
        )
        if err != nil {
            fmt.Printf("發送消息到隊列異常:%s", err)
            return
        }
    }

    【四】.直接匹配(direct)

    4.0.client code

    // 生產_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 生產_聲明交換機
    exchangeName := "pcb_layout_order"
    err := channel.ExchangeDeclare(
        // 交換機名稱
        exchangeName,
        // 交換機類型
        "direct",
        // 持久化
        true,
        // true->當所有綁定都與交換器解綁后,會自動刪除此交換器
        false,
        // true->客戶端無法直接發送msg到內部交換器,只有交換器可以發送msg到內部交換器
        false,
        // 是否非阻塞
        false,
        // 其他參數
        nil,
    )
    if err != nil {
        fmt.Printf("聲明交換機異常:%s", err)
        return
    }
    
    // 生產_發送消息到交換機
    allRouteKey := []string{
        "order_insert", // 新增訂單
        "order_delete", // 刪除訂單
    }
    
    // 循環發送到兩個路由key
    message := "訂單id1事件"
    for _, routeKey := range allRouteKey {
        err = channel.Publish(
            // 交換機
            exchangeName,
            // 路由key
            routeKey,
            // true->根據自身exchange類型和routeKey規則無法找到符合條件的隊列會把消息返還給發送者,false->出現上述情況會直接將消息丟棄
            false,
            // true->當exchange將消息路由到隊列后發現隊列上沒有消費者則會把消息返還給發送者,false->出現上述情況,消息一樣會發送給消息隊列
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                // 隊列和消息同時設置持久化
                DeliveryMode: 2,
                Body:         []byte(message),
            },
        )
    }

    4.1.service code

    // 消費_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 消費_聲明隊列
    queueName := "notice_queue"
    _, err := channel.QueueDeclare(
        // 隊列名稱
        queueName,
        // 是否持久化
        false,
        // 是否自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞處理
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Printf("聲明隊列異常:%s", err)
        return
    }
    
    // 隊列綁定交換機+綁定訂單新增key
    exchangeName := "pcb_layout_order"
    allRouteKey := []string{
        "order_insert", // 新增訂單
        "order_delete", // 刪除訂單
    }
    for _, routeKey := range allRouteKey {
        channel.QueueBind(
            // 隊列名稱
            queueName,
            // 綁定的鍵
            routeKey,
            // 交換機名稱
            exchangeName,
            // 是否阻塞處理
            false,
            // 其他參數
            nil,
        )
    }
    
    // 設置同一時間服務器只會發送一條消息給消費者
    channel.Qos(
        // 每次獲取多少條
        10,
        // 預加載數量(rabbitMq不支持)
        0,
        // false->對當前隊列可用 true->對channel可用(rabbitMq不支持)
        false,
    )
    
    // 消費_獲取隊列中的消息
    message, err := channel.Consume(
        // 隊列名稱
        queueName,
        // 消費者名稱
        "ic訂單消費者",
        // 是否自動ack
        false,
        // 是否排他性隊列標識
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return
    }
    // 輸出消息
    for msg := range message {
        // 打印消息內容
        fmt.Printf("收到隊列消息%s \n", msg.Body)
        // 確認收到消息
        msg.Ack(true)
    }

    【五】.直接匹配(topic)

    topic同樣根據key匹配到隊列,#匹配一個或者多個,*匹配一個.(切記:發往topic交換器的routing_key它必須是.分隔的幾個詞)

    5.0.client code

    // 生產_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 生產_聲明交換機
    exchangeName := "smt_steel_order"
    err := channel.ExchangeDeclare(
        // 交換機名稱
        exchangeName,
        // 交換機類型
        "topic",
        // 持久化
        true,
        // true->當所有綁定都與交換器解綁后,會自動刪除此交換器
        false,
        // true->客戶端無法直接發送msg到內部交換器,只有交換器可以發送msg到內部交換器
        false,
        // 是否非阻塞
        false,
        // 其他參數
        nil,
    )
    if err != nil {
        fmt.Printf("聲明交換機異常:%s", err)
        return
    }
    
    // 生產_發送消息到交換機
    allRouteKey := []string{
        "order.insert", // 新增訂單
        "order.delete", // 刪除訂單
    }
    for _, routeKey := range allRouteKey {
        //fmt.Print(routeKey)
        message := "來自" + routeKey + "的消息"
        err = channel.Publish(
            // 交換機
            exchangeName,
            // 路由key
            routeKey,
            // true->根據自身exchange類型和routeKey規則無法找到符合條件的隊列會把消息返還給發送者,false->出現上述情況會直接將消息丟棄
            true,
            // true->當exchange將消息路由到隊列后發現隊列上沒有消費者則會把消息返還給發送者,false->出現上述情況,消息一樣會發送給消息隊列
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                // 隊列和消息同時設置持久化
                DeliveryMode: 2,
                Body:         []byte(message),
            },
        )
    }

    5.1.service code

    // 消費_獲取connection的channel
    channel := Connecttion.NewRabbitMQ()
    
    // 消費_聲明隊列
    queueName := "notice_queue"
    _, err := channel.QueueDeclare(
        // 隊列名稱
        queueName,
        // 是否持久化
        false,
        // 是否自動刪除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞處理
        false,
        // 額外屬性
        nil,
    )
    if err != nil {
        fmt.Printf("聲明隊列異常:%s", err)
        return
    }
    
    // 隊列綁定交換機+綁定訂單新增key
    exchangeName := "smt_steel_order"
    routeKey := "order.#"
    channel.QueueBind(
        // 隊列名稱
        queueName,
        // 綁定的路由
        routeKey,
        // 交換機名稱
        exchangeName,
        // 是否阻塞處理
        false,
        // 其他參數
        nil,
    )
    
    // 設置同一時間服務器只會發送一條消息給消費者
    channel.Qos(
        // 每次獲取多少條
        10,
        // 預加載數量(rabbitMq不支持)
        0,
        // false->對當前隊列可用 true->對channel可用(rabbitMq不支持)
        false,
    )
    
    // 消費_獲取隊列中的消息
    message, err := channel.Consume(
        // 隊列名稱
        queueName,
        // 消費者名稱
        "smt訂單消費者",
        // 是否自動ack
        false,
        // 是否排他性隊列標識
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return
    }
    
    // 輸出消息
    for msg := range message {
        // 打印消息內容
        fmt.Printf("收到隊列消息%s \n", msg.Body)
        // 確認收到消息
        msg.Ack(true)
    }
    訪客
    郵箱
    網址

    Top 亚洲狠狠久久综合一区二区三区
    <progress id="73rr5"></progress>
    <tbody id="73rr5"><pre id="73rr5"></pre></tbody>

      <tbody id="73rr5"></tbody><dd id="73rr5"><track id="73rr5"></track></dd>