バイセル Tech Blog

バイセル Tech Blogは株式会社BuySell Technologiesのエンジニア達が知見・発見を共有する技術ブログです。

ヘッダー画像

Redis Pub/Sub でお手軽にお知らせ配信機能を実装する

はじめに

こちらは バイセルテクノロジーズ Advent Calendar 2021 の 11 日目の記事です。
前日の記事は@ShoyaSuzuki-bstさんの「SlackボットとGASで癒しを提供する話」でした。

こんにちは。開発 2 部の飯間です。 バイセルでは主にサーバサイドの開発に携わっています。

本記事では、開発中の「リユースプラットフォーム」におけるお知らせ機能の設計について紹介したい思います。

要件

お知らせ機能の要件は下記となります。

f:id:bst-tech:20211205145151p:plain

  • DB のリソースに変更があった際に、そのリソースに関連するユーザに対して、ブラウザ内通知で知らせたい
    • 個人単位でユーザを特定して通知する必要があるため、通知のチャンネルはユーザごとに分かれていなければならない
  • システムを利用する企業・店舗・人ごとにプライベートな通知チャンネルとして分ける必要がある
    • パブリックな全体配信機能だけでは不十分
    • ある企業への通知が、別の企業に届いてはならない

Redis Pub/Sub とは

Redis はデータベースやキャッシュとして利用されることが多いですが、 その他にも、 Pub/Sub メッセージングモデルにおけるメッセージブローカー (仲介者) としても使うことができます。

f:id:bst-tech:20211205145304p:plain

Redis での Pub/Sub は、それぞれ、任意の数のパブリッシャーチャンネルサブスクライバーの 3 要素で構成されます。
パブリッシャーがあるチャンネルにメッセージを配信すると、そのチャンネルを購読しているサブスクライバーに届きます。

パブリッシャーとサブスクライバーは互いの存在を知らなくてもよく、依存しません。

そのため、以下のような振る舞いをします

  • パブリッシャーがメッセージを送信した時点で、誰もそのチャンネルを購読していなかった場合、メッセージは誰にも届かない
    • 誰にも届かなかったからといってエラーにはならず、単にメッセージが揮発するだけ
  • パブリッシャーによって一度もメッセージが送信されたことのないチャンネルに対して、サブスクライバーは購読することが可能
    • チャンネルが存在しない等のエラーにはならず、初めてメッセージが送信されるまで待ち続けることが可能

構成

お知らせ機能の構成は下図の通りです。

f:id:bst-tech:20211205145350p:plain

  • パブリッシュサーバとサブスクライブサーバは Redis Pub/Sub で接続
  • サブスクライブサーバとクライアントは GraphQL Subscription で接続

この構成のポイントとしては、 Redis Pub/Sub Channel をユーザ単位でオープンしていることにあります。
クライアントはサブスクライブサーバと接続する際に、自身のユーザ名をパラメータとして渡し、その値で Pub/Sub Channel をオープンします。
本来のプロダクトコードでは、リクエストパラメータでユーザ名を渡すのではなく、 JWT 等で渡すのがよいでしょう。

なお、このような作りにしている理由ですが、上の 要件 の項目で述べた「個人単位でユーザを特定して通知する必要があるため」です。

コードについて

ここからは、設計にあたってプロトタイプとして実装したコードを掲載したいと思います。
コードの全体は下記リポジトリにアップしてあります。

github.com

サブスクライブサーバの実装

主要な技術スタック

GraphQL スキーマ定義

クライアントとサブスクライブサーバは GraphQL Subscription で接続されます。
サブスクライブサーバのスキーマ定義は下記の通りです。

pubsub-demo/sub/graph/schema.graphqls

"""
クライアントが受け取る通知の型
"""
type Notification {
  """
  お知らせ本文
  """
  text: String!

  """
  unixtime 形式のタイムスタンプ
  """
  timestamp: Int!
}

"""
WebSocket エンドポイント
引数 `userName` は Redis Pub/Sub のチャンネル名として使う
"""
type Subscription {
  onNotificationReceived(userName: String!): Notification!
}

GraphQL サーバの実装

gqlgen を使ってサーバを実装します。

pubsub-demo/sub/server.go

func main() {
    redisURL := mustGetEnv("REDIS_URL")

    // WebSocket を使う場合はトランスポートの追加を自分で行う必要があるため、
    // `handler.NewDefaultServer` ではなく `handler.New` で初期化すること
    // https://vallettaio.hatenablog.com/entry/2020/05/10/024551
    srv := handler.New(
        generated.NewExecutableSchema(
            generated.Config{
                Resolvers: graph.NewResolver(
                    context.Background(),
                    redisURL,
                ),
            },
        ),
    )
    // Subscription を使うには `transport.POST` と `transport.Websocket` をトランスポートとして追加すればよい
    srv.AddTransport(transport.POST{})
    srv.AddTransport(transport.Websocket{
        KeepAlivePingInterval: 10 * time.Second,
        Upgrader: websocket.Upgrader{
            CheckOrigin: func(r *http.Request) bool {
                return true
            },
        },
    })
    srv.Use(extension.Introspection{})

    // CORS の設定
    c := cors.New(cors.Options{
        // プロダクトコードであれば正しくオリジンを設定すること
        AllowedOrigins:   []string{"*"},
        AllowCredentials: true,
    })

    http.Handle("/", playground.Handler("GraphQL playground", "/graphql"))
    http.Handle("/graphql", c.Handler(srv))

    port := mustGetEnv("PORT")
    log.Printf("connect to http://localhost:%s/ for GraphQL playground", port)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

Resolver の実装

Resolver.observers でサーバに接続中のクライアントの状態を保持し、追跡できる状態にしておきます。 なお、接続状態を追跡する理由は下記の通りです。

  • Redis から通知を受け取った際に、メッセージを流す先のクライアントを判別するため
    • 例: Redis から Bob 宛ての通知を受け取った際に、接続中の Bob のクライアントだけにメッセージを流す
  • クライアントがサーバから切断したときに、 Redis のチャンネルを閉じるため

pubsub-demo/sub/graph/resolver.go

type Resolver struct {
    redisClient *redis.Client
    observers   map[string]map[string]chan<- *model.Notification // 接続中のクライアントの状態を格納
    mutex       sync.Mutex
}

func NewResolver(ctx context.Context, redisURL string) *Resolver {
    redisClient := redis.NewClient(&redis.Options{
        Addr: redisURL,
    })

    if err := redisClient.Ping(ctx).Err(); err != nil {
        panic("failed to ping redis server")
    }

    return &Resolver{
        redisClient: redisClient,
        observers:   map[string]map[string]chan<- *model.Notification{},
        mutex:       sync.Mutex{},
    }
}

Subscription の実装

WebSocket エンドポイントである onMessageReceived は下記のように実装されています。

pubsub-demo/sub/graph/schema.graphqls

  1. 接続クライアントの状態をサーバに保持する

    ch := make(chan *model.Notification, 1)
    
    u, err := uuid.NewRandom()
    if err != nil {
     return nil, err
    }
    connID := u.String()
    
    if _, ok := r.observers[userName]; !ok {
     r.observers[userName] = make(map[string]chan<- *model.Notification)
    }
    r.observers[userName][connID] = ch
    
  2. userName と同名の Redis Pub/Sub Channel を Subscribe する

    pubsub := r.redisClient.Subscribe(ctx, userName)
    
  3. Redis からの通知をハンドリングできるようにする

    go func() {
     pubsubCh := pubsub.Channel()
    
     for notification := range pubsubCh {
         n := &model.Notification{}
         if err := json.Unmarshal([]byte(notification.Payload), n); err != nil {
             continue
         }
         n.Timestamp = int(time.Now().Unix())
    
         // メッセージを配信
         r.mutex.Lock()
         for _, observer := range r.observers[userName] {
             observer <- n
         }
         r.mutex.Unlock()
     }
    }()
    
  4. クライアントによるサブスクリプションの終了をハンドリングできるようにする

    go func() {
     <-ctx.Done()
     r.mutex.Lock()
     delete(r.observers[userName], connID)
     r.mutex.Unlock()
    
     pubsub.Close()
    }()
    
    return ch, nil
    

動作確認

必要なコードが揃ったので動作確認をしてみます。

  1. サーバを起動する

    $ cd pubsub-demo
    $ docker compose up
    
  2. http://localhost:8081 を開き、サブスクライブサーバの GraphQL Playground を開く。 onNotificationReceived クエリを実行し、通知を待ち受ける f:id:bst-tech:20211205152653p:plain

  3. 別のタブで http://localhost:8080 を開き、パブリッシュサーバの GraphQL Playground を開く。 sendNotification クエリを実行し、通知を送信する f:id:bst-tech:20211205153143p:plain

通知を送信すると、 http://localhost:8081 のタブでメッセージを受け取る様子が確認できます。

f:id:bst-tech:20211205153853g:plain

まとめ

いかがでしたでしょうか。
Redis の Pub/Sub 機能を使うと、比較的簡単に通知配信が実現できることがお分かりいただけたかと思います。

さいごに

バイセルでは一緒に新規サービスの開発をしてくれるエンジニアを募集しています! hrmos.co

明日のバイセルテクノロジーズ Advent Calendar 2021@naoto_pqさんによる
「オンラインイベントで役立つアンケート作成のスキル」です。お楽しみに!

参考にさせていただいたサイト