はじめに
こちらは バイセルテクノロジーズ Advent Calendar 2021 の 11 日目の記事です。
前日の記事は@ShoyaSuzuki-bstさんの「SlackボットとGASで癒しを提供する話」でした。
こんにちは。開発 2 部の飯間です。 バイセルでは主にサーバサイドの開発に携わっています。
本記事では、開発中の「リユースプラットフォーム」におけるお知らせ機能の設計について紹介したい思います。
要件
お知らせ機能の要件は下記となります。
- DB のリソースに変更があった際に、そのリソースに関連するユーザに対して、ブラウザ内通知で知らせたい
- 個人単位でユーザを特定して通知する必要があるため、通知のチャンネルはユーザごとに分かれていなければならない
- システムを利用する企業・店舗・人ごとにプライベートな通知チャンネルとして分ける必要がある
- パブリックな全体配信機能だけでは不十分
- ある企業への通知が、別の企業に届いてはならない
Redis Pub/Sub とは
Redis はデータベースやキャッシュとして利用されることが多いですが、 その他にも、 Pub/Sub メッセージングモデルにおけるメッセージブローカー (仲介者) としても使うことができます。
Redis での Pub/Sub は、それぞれ、任意の数のパブリッシャー
・ チャンネル
・サブスクライバー
の 3 要素で構成されます。
パブリッシャーがあるチャンネルにメッセージを配信すると、そのチャンネルを購読しているサブスクライバーに届きます。
パブリッシャーとサブスクライバーは互いの存在を知らなくてもよく、依存しません。
そのため、以下のような振る舞いをします
- パブリッシャーがメッセージを送信した時点で、誰もそのチャンネルを購読していなかった場合、メッセージは誰にも届かない
- 誰にも届かなかったからといってエラーにはならず、単にメッセージが揮発するだけ
- パブリッシャーによって一度もメッセージが送信されたことのないチャンネルに対して、サブスクライバーは購読することが可能
- チャンネルが存在しない等のエラーにはならず、初めてメッセージが送信されるまで待ち続けることが可能
構成
お知らせ機能の構成は下図の通りです。
- パブリッシュサーバとサブスクライブサーバは Redis Pub/Sub で接続
- サブスクライブサーバとクライアントは GraphQL Subscription で接続
この構成のポイントとしては、 Redis Pub/Sub Channel をユーザ単位でオープンしていることにあります。
クライアントはサブスクライブサーバと接続する際に、自身のユーザ名をパラメータとして渡し、その値で Pub/Sub Channel をオープンします。
本来のプロダクトコードでは、リクエストパラメータでユーザ名を渡すのではなく、 JWT 等で渡すのがよいでしょう。
なお、このような作りにしている理由ですが、上の 要件
の項目で述べた「個人単位でユーザを特定して通知する必要があるため」です。
コードについて
ここからは、設計にあたってプロトタイプとして実装したコードを掲載したいと思います。
コードの全体は下記リポジトリにアップしてあります。
サブスクライブサーバの実装
主要な技術スタック
- Go: version 1.16
- Redis クライアント: go-redis/redis
- GraphQL サーバ: 99designs/gqlgen
- Redis: version 6.2.2
GraphQL スキーマ定義
クライアントとサブスクライブサーバは GraphQL Subscription で接続されます。
サブスクライブサーバのスキーマ定義は下記の通りです。
pubsub-demo/sub/graph/schema.graphqls
""" クライアントが受け取る通知の型 """ type Notification { """ お知らせ本文 """ text: String! """ unixtime 形式のタイムスタンプ """ timestamp: Int! } """ WebSocket エンドポイント 引数 `userName` は Redis Pub/Sub のチャンネル名として使う """ type Subscription { onNotificationReceived(userName: String!): Notification! }
GraphQL サーバの実装
gqlgen
を使ってサーバを実装します。
pubsub-demo/sub/server.go
func main() { redisURL := mustGetEnv("REDIS_URL") // WebSocket を使う場合はトランスポートの追加を自分で行う必要があるため、 // `handler.NewDefaultServer` ではなく `handler.New` で初期化すること // https://vallettaio.hatenablog.com/entry/2020/05/10/024551 srv := handler.New( generated.NewExecutableSchema( generated.Config{ Resolvers: graph.NewResolver( context.Background(), redisURL, ), }, ), ) // Subscription を使うには `transport.POST` と `transport.Websocket` をトランスポートとして追加すればよい srv.AddTransport(transport.POST{}) srv.AddTransport(transport.Websocket{ KeepAlivePingInterval: 10 * time.Second, Upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, }, }) srv.Use(extension.Introspection{}) // CORS の設定 c := cors.New(cors.Options{ // プロダクトコードであれば正しくオリジンを設定すること AllowedOrigins: []string{"*"}, AllowCredentials: true, }) http.Handle("/", playground.Handler("GraphQL playground", "/graphql")) http.Handle("/graphql", c.Handler(srv)) port := mustGetEnv("PORT") log.Printf("connect to http://localhost:%s/ for GraphQL playground", port) log.Fatal(http.ListenAndServe(":"+port, nil)) }
Resolver の実装
Resolver.observers
でサーバに接続中のクライアントの状態を保持し、追跡できる状態にしておきます。
なお、接続状態を追跡する理由は下記の通りです。
- Redis から通知を受け取った際に、メッセージを流す先のクライアントを判別するため
- 例: Redis から Bob 宛ての通知を受け取った際に、接続中の Bob のクライアントだけにメッセージを流す
- クライアントがサーバから切断したときに、 Redis のチャンネルを閉じるため
pubsub-demo/sub/graph/resolver.go
type Resolver struct { redisClient *redis.Client observers map[string]map[string]chan<- *model.Notification // 接続中のクライアントの状態を格納 mutex sync.Mutex } func NewResolver(ctx context.Context, redisURL string) *Resolver { redisClient := redis.NewClient(&redis.Options{ Addr: redisURL, }) if err := redisClient.Ping(ctx).Err(); err != nil { panic("failed to ping redis server") } return &Resolver{ redisClient: redisClient, observers: map[string]map[string]chan<- *model.Notification{}, mutex: sync.Mutex{}, } }
Subscription の実装
WebSocket エンドポイントである onMessageReceived
は下記のように実装されています。
pubsub-demo/sub/graph/schema.graphqls
- 接続クライアントの状態をサーバに保持する
ch := make(chan *model.Notification, 1)
u, err := uuid.NewRandom()
if err != nil {
return nil, err
}
connID := u.String()
if _, ok := r.observers[userName]; !ok {
r.observers[userName] = make(map[string]chan<- *model.Notification)
}
r.observers[userName][connID] = ch
userName
と同名の Redis Pub/Sub Channel を Subscribe する
pubsub := r.redisClient.Subscribe(ctx, userName)
- Redis からの通知をハンドリングできるようにする
go func() {
pubsubCh := pubsub.Channel()
for notification := range pubsubCh {
n := &model.Notification{}
if err := json.Unmarshal([]byte(notification.Payload), n); err != nil {
continue
}
n.Timestamp = int(time.Now().Unix())
// メッセージを配信
r.mutex.Lock()
for _, observer := range r.observers[userName] {
observer <- n
}
r.mutex.Unlock()
}
}()
- クライアントによるサブスクリプションの終了をハンドリングできるようにする
go func() {
<-ctx.Done()
r.mutex.Lock()
delete(r.observers[userName], connID)
r.mutex.Unlock()
pubsub.Close()
}()
return ch, nil
動作確認
必要なコードが揃ったので動作確認をしてみます。
- サーバを起動する
$ cd pubsub-demo
$ docker compose up
http://localhost:8081 を開き、サブスクライブサーバの GraphQL Playground を開く。
onNotificationReceived
クエリを実行し、通知を待ち受ける別のタブで http://localhost:8080 を開き、パブリッシュサーバの GraphQL Playground を開く。
sendNotification
クエリを実行し、通知を送信する
通知を送信すると、 http://localhost:8081 のタブでメッセージを受け取る様子が確認できます。
まとめ
いかがでしたでしょうか。
Redis の Pub/Sub 機能を使うと、比較的簡単に通知配信が実現できることがお分かりいただけたかと思います。
さいごに
バイセルでは一緒に新規サービスの開発をしてくれるエンジニアを募集しています! herp.careers
明日のバイセルテクノロジーズ Advent Calendar 2021は@naoto_pqさんによる
「オンラインイベントで役立つアンケート作成のスキル」です。お楽しみに!