先日試した無限スクロールを試した。 SSEを使っての戦略は、筋があまり良くないらしい。
調べてみるとTwitterのタイムラインや、Facebookのニュースフィードは、「fan-out戦略」を採用しているらしい。 断片的な情報が見えてきたので、実装してみた。
最終的に動いているのは以下のような様子。
参考
fan-out 戦略とは 資料を読むに、「1つのイベントを如何に配るのか」にフォーカスした戦略。 Twitterであれば、「ユーザーのツイートをフォロワーへ如何に配るのか」という問題に適用されている。
タイムラインを構築するときに、ユーザーのツイートをフォロワーのタイムラインに配る方法は大きく分けて2つある。 「見るとき集める」「先に配るか」の2択になる。 前者は、書き込みは極端には1レコード書き込むような処理で終了できる。 しかし、読むときに、フォロー対象さすべてのツイートを集め、ソートしてから表示する必要がある。 後者は、書き込みのときに、フォロワーすべてに配ってしまう。 配られているので、そのまま素直に並べて返却すればよい。
Redis fan-out戦略の文脈でRedisを使う場合、RailsCacheではあまり使わないメソッドが出てくる。
リストを操作するもの
LPUSH : 先頭に要素を追加するコマンド
RPUSH : 末尾に要素を追加するコマンド
LRANGE : 指定した範囲の要素を取得するコマンド
LTRIM : 指定した範囲以外の要素を削除するコマンド
ソート済みセットを操作するもの
ZADD : 要素を追加するコマンド
ZRANGE : 指定した範囲の要素を取得するコマンド
ZREMRANGEBYSCORE : 指定したスコア範囲の要素を削除するコマンド
ZREMRANGEBYRANK : 指定したランク範囲の要素を削除するコマンド
ZREMRANGEBYLEX : 指定した文字列範囲の要素を削除するコマンド
ZCARD : 要素数を取得するコマンド
ZCOUNT : 指定したスコア範囲の要素数を取得するコマンド
ZREVRANGE : 指定した範囲の要素を逆順で取得するコマンド
これらは、直接Redisを直接使う必要がある。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 > REDIS .rpush("list1" , 1 ) => 1 > REDIS .rpush("list1" , 2 ) => 2 > REDIS .lpush("list1" , 3 ) => 3 > REDIS .lrange("list1" ,0 ,-1 ) => ["3" , "1" , "2" ] > REDIS .llen("list1" ) => 3 > REDIS .ltrim("list1" ,0 ,1 ) => "OK" > REDIS .lrange("list1" ,0 ,-1 ) => ["3" , "1" ]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 > REDIS .zadd("slist1" ,0 ,0 ) => true > REDIS .zadd("slist1" ,1 ,1 ) => true > REDIS .zadd("slist1" ,2 ,2.3 ) => true > REDIS .zrange("slist" , 0 , -1 ) => [] > REDIS .zrange("slist" , 0 , 1 ) => [] > REDIS .zrange("slist1" , 0 , -1 ) => ["0" , "1" , "2.3" ] > REDIS .zrevrange("slist1" , 0 , -1 ) => ["2.3" , "1" , "0" ]
実装 前提 実装方針として以下の前提を置きます。
モデル モデルとして以下があるとします。
classDiagram
User "1" --> "*" Article : has_many
User "1" --> "*" Follow : has_many (as follower)
User "1" --> "*" Follow : has_many (as following)
class User {
+int id PK
+string name
+datetime created_at
+datetime updated_at
}
class Article {
+int id PK
+int user_id FK
+text comment
+datetime created_at
+datetime updated_at
}
class Follow {
+int id PK
+int follower_id FK
+int following_id FK
+datetime created_at
+datetime updated_at
}
処理
Userは、記事を投稿できる。(テストのため、Rakeタスクで定期的に投稿する)
Userは、他のユーザーをフォローできる。
フォローしたとき、フォロー対象のユーザーの投稿がタイムラインに表示される。(遅延処理によりタイムラインを更新する。)
Userは、フォローしているユーザーすべての投稿がマージされたタイムラインを閲覧できる。
他のユーザーをアンフォローしたとき、タイムラインからそのユーザーの投稿が消える。
ソース モデル app/models/user.rb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class User < ApplicationRecord has_many :articles , dependent: :destroy has_many :active_follows , class_name: 'Follow' , foreign_key: 'follower_id' , dependent: :destroy has_many :passive_follows , class_name: 'Follow' , foreign_key: 'following_id' , dependent: :destroy has_many :following , through: :active_follows , source: :following has_many :followers , through: :passive_follows , source: :follower def follow (user ) following << user unless following?(user) | | self == user end def unfollow (user ) following.delete(user) end def following? (user ) following.include ?(user) end end
app/models/follow.rb 1 2 3 4 5 6 7 8 9 10 11 12 13 class Follow < ApplicationRecord belongs_to :follower , class_name: 'User' belongs_to :following , class_name: 'User' validates :follower_id , uniqueness: { scope: :following_id } validate :cannot_follow_self private def cannot_follow_self errors.add(:following_id , "can't follow yourself" ) if follower_id == following_id end end
app/models/article.rb 1 2 3 class Article < ApplicationRecord belongs_to :user end
コントローラー app/controllers/articles_controller.rb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class ArticlesController < ApplicationController before_action :set_current_user def index page = params[:page ].to_i page = 1 if page < 1 per_page = 20 ids = REDIS .zrevrange("timeline:#{@current_user .id} " , (page - 1 ) * per_page, page * per_page).map(&:to_i ) has_next_page = ids.size > per_page ids = ids.first(per_page) articles_by_id = Article .where(id: ids).includes(:user ).index_by(&:id ) articles = ids.map { |id | articles_by_id[id] }.compact render json: { articles: articles.map { |a | { id: a.id, comment: a.comment, created_at: a.created_at, user_name: a.user.name } }, has_next_page: has_next_page, next_page: has_next_page ? page + 1 : nil } end private def set_current_user @current_user = User .first end end
app/controllers/users_controller.rb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class UsersController < ApplicationController before_action :set_current_user def index @pagy , @users = pagy(User .where.not (id: @current_user .id).order(:name )) end def show @user = User .includes(:articles , :followers , :following ).find(params[:id ]) @articles = @user .articles.order(created_at: :desc ) end def follow user = User .find(params[:id ]) @current_user .follow(user) UpdateFollowTimelineJob .perform_later(@current_user .id, user.id) redirect_to users_path, notice: "#{user.name} をフォローしました" end def unfollow user = User .find(params[:id ]) @current_user .unfollow(user) UpdateUnfollowTimelineJob .perform_later(@current_user .id, user.id) redirect_to users_path, notice: "#{user.name} のフォローを解除しました" end private def set_current_user @current_user = User .first end end
app/controllers/timelines_controller.rb 1 2 3 4 class TimelineController < ApplicationController def index end end
ジョブ app/jobs/update_follow_timeline_job. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class UpdateFollowTimelineJob < ApplicationJob queue_as :default def perform (follower_id, followed_user_id ) timeline_key = "timeline:#{follower_id} " user = User .find(followed_user_id) REDIS .pipelined do |pipeline | user.articles.find_each do |article | pipeline.zadd(timeline_key, article.id, article.id) end pipeline.zremrangebyrank(timeline_key, 0 , -501 ) end end end
app/jobs/update_unfollow_timeline_job.rb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class UpdateUnfollowTimelineJob < ApplicationJob queue_as :default def perform (follower_id, unfollowed_user_id ) timeline_key = "timeline:#{follower_id} " user = User .find(unfollowed_user_id) article_ids = user.articles.last(500 ).pluck(:id ) if article_ids.any? REDIS .pipelined do |pipeline | article_ids.each do |article_id | pipeline.zrem(timeline_key, article_id) end end end end end
app/jobs/add_article_job.rb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class AddArticleJob < ApplicationJob queue_as :default def perform users = User .order("RANDOM()" ).limit(300 ) return unless users.any? current_user = User .first users.each do |user | Article .create!({ user_id: user.id, comment: Faker : :Lorem .paragraph, }) timeline_key = "timeline:#{current_user.id} " if Follow .exists?(follower_id: current_user.id, following_id: user.id) REDIS .pipelined do |pipeline | user.articles.find_each do |article | pipeline.zadd(timeline_key, article.id, article.id) end pipeline.zremrangebyrank(timeline_key, 0 , -501 ) end end end rescue => e Rails .logger.error "[AddArticleJob] Error: #{e.message} " end end
フロント側 フロント側の構成は、Rails x React で無限スクロールとストリーミング とほぼ同様。 ただし、SSEではなく単純にページングを利用したfetch APIの呼び出し。
挙動 ソースも参照しながら、実際の挙動を追うと以下のようになっている。
フォロー時のタイムライン更新 UserA が UserB をフォローすると、UserB の投稿が UserA のタイムラインに追加される。
sequenceDiagram
participant UserA as UserA (フォローする側)
participant Rails as Rails Server
participant Job as UpdateFollowTimelineJob
participant Redis as Redis
participant DB as PostgreSQL
UserA->>Rails: POST /users/:id/follow
Rails->>DB: Follow.create(follower: UserA, following: UserB)
Rails->>Job: perform_later(UserA.id, UserB.id)
Rails-->>UserA: redirect (フォロー完了)
Note over Job,Redis: 非同期処理
Job->>DB: UserB.articles を取得
DB-->>Job: [Article#101, Article#102, Article#103]
Job->>Redis: ZADD timeline:UserA.id 101 101
Job->>Redis: ZADD timeline:UserA.id 102 102
Job->>Redis: ZADD timeline:UserA.id 103 103
Job->>Redis: ZREMRANGEBYRANK timeline:UserA.id 0 -501
Note over Redis: 最新500件を保持
具体例:
操作前 (UserA のタイムライン)
操作後 (UserA のタイムライン)
timeline:1 = [50, 49, 48, …]
timeline:1 = [103, 102, 101, 50, 49, 48, …]
UserB の記事(ID: 101, 102, 103)がタイムラインに追加される。
アンフォロー時のタイムライン更新 UserA が UserB をアンフォローすると、UserB の投稿が UserA のタイムラインから削除される。
sequenceDiagram
participant UserA as UserA (アンフォローする側)
participant Rails as Rails Server
participant Job as UpdateUnfollowTimelineJob
participant Redis as Redis
participant DB as PostgreSQL
UserA->>Rails: DELETE /users/:id/unfollow
Rails->>DB: Follow.delete(follower: UserA, following: UserB)
Rails->>Job: perform_later(UserA.id, UserB.id)
Rails-->>UserA: redirect (アンフォロー完了)
Note over Job,Redis: 非同期処理
Job->>DB: UserB.articles.last(500).pluck(:id)
DB-->>Job: [101, 102, 103]
Job->>Redis: ZREM timeline:UserA.id 101
Job->>Redis: ZREM timeline:UserA.id 102
Job->>Redis: ZREM timeline:UserA.id 103
Note over Redis: UserBの投稿を削除
具体例:
操作前 (UserA のタイムライン)
操作後 (UserA のタイムライン)
timeline:1 = [103, 102, 101, 50, 49, 48, …]
timeline:1 = [50, 49, 48, …]
UserB の記事(ID: 101, 102, 103)がタイムラインから削除される。
タイムライン閲覧時 sequenceDiagram
participant UserA as UserA
participant React as React (Frontend)
participant Rails as Rails Server
participant Redis as Redis
participant DB as PostgreSQL
UserA->>React: タイムラインページ表示
React->>Rails: GET /articles?page=1
Rails->>Redis: ZREVRANGE timeline:UserA.id 0 20
Redis-->>Rails: [103, 102, 101, 50, 49, ...]
Rails->>DB: Article.where(id: [...]).includes(:user)
DB-->>Rails: [Article#103, Article#102, ...]
Rails-->>React: JSON {articles: [...], has_next_page: true}
React-->>UserA: 記事一覧を表示
Redisから記事IDを取得し、その順序を維持したままDBから記事データを取得して返却する。
動きの様子
比較 「fan-out 戦略いいじゃん」と安易に飛びつかないため、比較をしてみる。
以下ソースは、タイムラインの取得処理を「見るとき集める」戦略で実装したものを用意し、データの作成までの時間を計測する。
app/controllers/articles_controller.rb(見るとき集める版) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class ArticlesController < ApplicationController before_action :set_current_user def index start_time = Process .clock_gettime(Process : :CLOCK_MONOTONIC ) page = params[:page ].to_i page = 1 if page < 1 per_page = 20 following_ids = @current_user .following.pluck(:id ) articles = Article .includes(:user ).where(user_id: following_ids).order(created_at: :desc ).limit(per_page + 1 ).offset((page - 1 ) * per_page) has_next_page = articles.size > per_page articles_hash = articles.limit(per_page).map { |a | { id: a.id, comment: a.comment, created_at: a.created_at, user_name: a.user.name } } total_time = ((Process .clock_gettime(Process : :CLOCK_MONOTONIC ) - start_time) * 1000 ).round(2 ) REDIS .lpush("honesty_query_timeline_load_times" , total_time) REDIS .ltrim("honesty_query_timeline_load_times" , 0 , 99 ) times = REDIS .lrange("honesty_query_timeline_load_times" , 0 , -1 ).map(&:to_f ) avg_time = (times.sum / times.size).round(2 ) max_time = times.max.round(2 ) min_time = times.min.round(2 ) p "[ArticlesController#index] 件数: #{times.size} , Avg: #{avg_time} ms, Max: #{max_time} ms, Min: #{min_time} ms" render json: { articles: articles_hash, has_next_page: has_next_page, next_page: has_next_page ? page + 1 : nil } end end
app/controllers/articles_controller.rb(fan-out戦略版) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class ArticlesController < ApplicationController before_action :set_current_user def index start_time = Process .clock_gettime(Process : :CLOCK_MONOTONIC ) page = params[:page ].to_i page = 1 if page < 1 per_page = 20 ids = REDIS .zrevrange("timeline:#{@current_user .id} " , (page - 1 ) * per_page, page * per_page).map(&:to_i ) has_next_page = ids.size > per_page ids = ids.first(per_page) articles_by_id = Article .where(id: ids).includes(:user ).index_by(&:id ) articles = ids.map { |id | articles_by_id[id] }.compact articles_hash = articles.map { |a | { id: a.id, comment: a.comment, created_at: a.created_at, user_name: a.user.name } } total_time = ((Process .clock_gettime(Process : :CLOCK_MONOTONIC ) - start_time) * 1000 ).round(2 ) REDIS .lpush("fan-out_timeline_load_times" , total_time) REDIS .ltrim("fan-out_timeline_load_times" , 0 , 99 ) times = REDIS .lrange("fan-out_timeline_load_times" , 0 , -1 ).map(&:to_f ) avg_time = (times.sum / times.size).round(2 ) max_time = times.max.round(2 ) min_time = times.min.round(2 ) p "[ArticlesController#index] 件数: #{times.size} , Avg: #{avg_time} ms, Max: #{max_time} ms, Min: #{min_time} ms" render json: { articles: articles_hash, has_next_page: has_next_page, next_page: has_next_page ? page + 1 : nil } end end
実行時に10数件フォローをしてから実行した。 動かしてみると、以下のような結果になった。
戦略
平均ロード時間
最大ロード時間
最小ロード時間
見るとき集める
30.6ms
74.59ms
23.92ms
fan-out戦略
11.84ms
37.51ms
7.94ms
1 2 3 4 5 見るとき集める "[ArticlesController#index] 件数: 100, Avg: 30.6ms, Max: 74.59ms, Min: 23.92ms" fan-out戦略 "[ArticlesController#index] 件数: 100, Avg: 11.84ms, Max: 37.51ms, Min: 7.94ms"
ローカルで実行というものだけど、平均で約3倍、最大で約2倍の差が出ている。 データ量・フォロー対象者が増えれば、もっと差は出る可能性がある。
fan-out戦略を試してみた。 比較してみると、確かに見るとき集める戦略よりも高速にタイムラインを表示できている。
ただし、Twitterがこの戦略にたどり着くまでに、いろいろとためしていてたどりついたもので、第一選択肢かといえば疑問符がつく。
「同じようなタイムラインでも流れ自体がそもそも遅い」などあれば、ある程度単純にキャッシュすれば満足できる可能性もある。
では。