fan-out 戦略をやってみる

先日試した無限スクロールを試した。
SSEを使っての戦略は、筋があまり良くないらしい。

調べてみるとTwitterのタイムラインや、Facebookのニュースフィードは、「fan-out戦略」を採用しているらしい。
断片的な情報が見えてきたので、実装してみた。

最終的に動いているのは以下のような様子。

参考

fan-out 戦略とは

資料を読むに、「1つのイベントを如何に配るのか」にフォーカスした戦略。
Twitterであれば、「ユーザーのツイートをフォロワーへ如何に配るのか」という問題に適用されている。

タイムラインを構築するときに、ユーザーのツイートをフォロワーのタイムラインに配る方法は大きく分けて2つある。
「見るとき集める」「先に配るか」の2択になる。
前者は、書き込みは極端には1レコード書き込むような処理で終了できる。
しかし、読むときに、フォロー対象さすべてのツイートを集め、ソートしてから表示する必要がある。
後者は、書き込みのときに、フォロワーすべてに配ってしまう。
配られているので、そのまま素直に並べて返却すればよい。

Redis

fan-out戦略の文脈でRedisを使う場合、RailsCacheではあまり使わないメソッドが出てくる。

  • リストを操作するもの

    • LPUSH : 先頭に要素を追加するコマンド
    • RPUSH : 末尾に要素を追加するコマンド
    • LRANGE : 指定した範囲の要素を取得するコマンド
    • LTRIM : 指定した範囲以外の要素を削除するコマンド
  • ソート済みセットを操作するもの

    • ZADD : 要素を追加するコマンド
    • ZRANGE : 指定した範囲の要素を取得するコマンド
    • ZREMRANGEBYSCORE : 指定したスコア範囲の要素を削除するコマンド
    • ZREMRANGEBYRANK : 指定したランク範囲の要素を削除するコマンド
    • ZREMRANGEBYLEX : 指定した文字列範囲の要素を削除するコマンド
    • ZCARD : 要素数を取得するコマンド
    • ZCOUNT : 指定したスコア範囲の要素数を取得するコマンド
    • ZREVRANGE : 指定した範囲の要素を逆順で取得するコマンド

これらは、直接Redisを直接使う必要がある。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
> REDIS.rpush("list1", 1)
=> 1
> REDIS.rpush("list1", 2)
=> 2
> REDIS.lpush("list1", 3)
=> 3
> REDIS.lrange("list1",0,-1)
=> ["3", "1", "2"]
> REDIS.llen("list1")
=> 3
> REDIS.ltrim("list1",0,1)
=> "OK"
> REDIS.lrange("list1",0,-1)
=> ["3", "1"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
> REDIS.zadd("slist1",0,0)
=> true
> REDIS.zadd("slist1",1,1)
=> true
> REDIS.zadd("slist1",2,2.3)
=> true
> REDIS.zrange("slist", 0, -1)
=> []
> REDIS.zrange("slist", 0, 1)
=> []
> REDIS.zrange("slist1", 0, -1)
=> ["0", "1", "2.3"]
> REDIS.zrevrange("slist1", 0, -1)
=> ["2.3", "1", "0"]

実装

前提

実装方針として以下の前提を置きます。

モデル

モデルとして以下があるとします。

classDiagram
  User "1" --> "*" Article : has_many
  User "1" --> "*" Follow : has_many (as follower)
  User "1" --> "*" Follow : has_many (as following)
  
  class User {
      +int id PK
      +string name
      +datetime created_at
      +datetime updated_at
  }
  
  class Article {
      +int id PK
      +int user_id FK
      +text comment
      +datetime created_at
      +datetime updated_at
  }
  
  class Follow {
      +int id PK
      +int follower_id FK
      +int following_id FK
      +datetime created_at
      +datetime updated_at
  }

処理

  • Userは、記事を投稿できる。(テストのため、Rakeタスクで定期的に投稿する)
  • Userは、他のユーザーをフォローできる。
    • フォローしたとき、フォロー対象のユーザーの投稿がタイムラインに表示される。(遅延処理によりタイムラインを更新する。)
  • Userは、フォローしているユーザーすべての投稿がマージされたタイムラインを閲覧できる。
    • ただし、最大500件までを表示するものとする。
  • 他のユーザーをアンフォローしたとき、タイムラインからそのユーザーの投稿が消える。

ソース

モデル

app/models/user.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class User < ApplicationRecord
has_many :articles, dependent: :destroy

# Following relationships
has_many :active_follows, class_name: 'Follow', foreign_key: 'follower_id', dependent: :destroy
has_many :passive_follows, class_name: 'Follow', foreign_key: 'following_id', dependent: :destroy

has_many :following, through: :active_follows, source: :following
has_many :followers, through: :passive_follows, source: :follower

def follow(user)
following << user unless following?(user) || self == user
end

def unfollow(user)
following.delete(user)
end

def following?(user)
following.include?(user)
end
end
app/models/follow.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
class Follow < ApplicationRecord
belongs_to :follower, class_name: 'User'
belongs_to :following, class_name: 'User'

validates :follower_id, uniqueness: { scope: :following_id }
validate :cannot_follow_self

private

def cannot_follow_self
errors.add(:following_id, "can't follow yourself") if follower_id == following_id
end
end
app/models/article.rb
1
2
3
class Article < ApplicationRecord
belongs_to :user
end

コントローラー

app/controllers/articles_controller.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ArticlesController < ApplicationController
before_action :set_current_user

def index
page = params[:page].to_i
page = 1 if page < 1
per_page = 20

# Redisから(per_page + 1)件取得してhas_next_pageを判定
ids = REDIS.zrevrange("timeline:#{@current_user.id}", (page - 1) * per_page, page * per_page).map(&:to_i)

has_next_page = ids.size > per_page
ids = ids.first(per_page)

# Redisの順序を維持するためindex_byを使用
articles_by_id = Article.where(id: ids).includes(:user).index_by(&:id)
articles = ids.map { |id| articles_by_id[id] }.compact

render json: {
articles: articles.map { |a| { id: a.id, comment: a.comment, created_at: a.created_at, user_name: a.user.name } },
has_next_page: has_next_page,
next_page: has_next_page ? page + 1 : nil
}
end

private

def set_current_user
@current_user = User.first
end
end
app/controllers/users_controller.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class UsersController < ApplicationController
before_action :set_current_user

def index
@pagy, @users = pagy(User.where.not(id: @current_user.id).order(:name))
end

def show
@user = User.includes(:articles, :followers, :following).find(params[:id])
@articles = @user.articles.order(created_at: :desc)
end

def follow
user = User.find(params[:id])
@current_user.follow(user)

UpdateFollowTimelineJob.perform_later(@current_user.id, user.id)

redirect_to users_path, notice: "#{user.name}をフォローしました"
end

def unfollow
user = User.find(params[:id])
@current_user.unfollow(user)
UpdateUnfollowTimelineJob.perform_later(@current_user.id, user.id)
redirect_to users_path, notice: "#{user.name}のフォローを解除しました"
end

private

def set_current_user
# 仮想的なログインユーザーとして最初のUserを使用
@current_user = User.first
end
end
app/controllers/timelines_controller.rb
1
2
3
4
class TimelineController < ApplicationController
def index
end
end

ジョブ

app/jobs/update_follow_timeline_job.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class UpdateFollowTimelineJob < ApplicationJob
queue_as :default

def perform(follower_id, followed_user_id)
timeline_key = "timeline:#{follower_id}"
user = User.find(followed_user_id)

REDIS.pipelined do |pipeline|
user.articles.find_each do |article|
pipeline.zadd(timeline_key, article.id, article.id)
end

pipeline.zremrangebyrank(timeline_key, 0, -501) # 最新500件を残す
end
end
end
app/jobs/update_unfollow_timeline_job.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class UpdateUnfollowTimelineJob < ApplicationJob
queue_as :default

def perform(follower_id, unfollowed_user_id)
timeline_key = "timeline:#{follower_id}"
user = User.find(unfollowed_user_id)

# フォロー解除されたユーザーの記事をタイムラインから削除
# タイムラインに500件のデータを保持するため、
# 特定ユーザーの投稿をタイムラインから削除するとき直近500件を削除すれば十分
article_ids = user.articles.last(500).pluck(:id)

if article_ids.any?
REDIS.pipelined do |pipeline|
article_ids.each do |article_id|
pipeline.zrem(timeline_key, article_id)
end
end
end
end
end
app/jobs/add_article_job.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class AddArticleJob < ApplicationJob
queue_as :default

def perform
users = User.order("RANDOM()").limit(300)
return unless users.any?

current_user = User.first # テストとして1人目のユーザーをログインユーザーとして取り扱う。

users.each do |user|
Article.create!({
user_id: user.id,
comment: Faker::Lorem.paragraph,
})
timeline_key = "timeline:#{current_user.id}"

if Follow.exists?(follower_id: current_user.id, following_id: user.id)
REDIS.pipelined do |pipeline|
user.articles.find_each do |article|
pipeline.zadd(timeline_key, article.id, article.id)
end

pipeline.zremrangebyrank(timeline_key, 0, -501) # 最新500件を残す
end
end
end
rescue => e
Rails.logger.error "[AddArticleJob] Error: #{e.message}"
end
end

フロント側

フロント側の構成は、Rails x React で無限スクロールとストリーミング とほぼ同様。
ただし、SSEではなく単純にページングを利用したfetch APIの呼び出し。

挙動

ソースも参照しながら、実際の挙動を追うと以下のようになっている。

フォロー時のタイムライン更新

UserA が UserB をフォローすると、UserB の投稿が UserA のタイムラインに追加される。

sequenceDiagram
    participant UserA as UserA (フォローする側)
    participant Rails as Rails Server
    participant Job as UpdateFollowTimelineJob
    participant Redis as Redis
    participant DB as PostgreSQL

    UserA->>Rails: POST /users/:id/follow
    Rails->>DB: Follow.create(follower: UserA, following: UserB)
    Rails->>Job: perform_later(UserA.id, UserB.id)
    Rails-->>UserA: redirect (フォロー完了)
    
    Note over Job,Redis: 非同期処理
    Job->>DB: UserB.articles を取得
    DB-->>Job: [Article#101, Article#102, Article#103]
    
    Job->>Redis: ZADD timeline:UserA.id 101 101
    Job->>Redis: ZADD timeline:UserA.id 102 102
    Job->>Redis: ZADD timeline:UserA.id 103 103
    Job->>Redis: ZREMRANGEBYRANK timeline:UserA.id 0 -501
    Note over Redis: 最新500件を保持

具体例:

操作前 (UserA のタイムライン) 操作後 (UserA のタイムライン)
timeline:1 = [50, 49, 48, …] timeline:1 = [103, 102, 101, 50, 49, 48, …]

UserB の記事(ID: 101, 102, 103)がタイムラインに追加される。

アンフォロー時のタイムライン更新

UserA が UserB をアンフォローすると、UserB の投稿が UserA のタイムラインから削除される。

sequenceDiagram
    participant UserA as UserA (アンフォローする側)
    participant Rails as Rails Server
    participant Job as UpdateUnfollowTimelineJob
    participant Redis as Redis
    participant DB as PostgreSQL

    UserA->>Rails: DELETE /users/:id/unfollow
    Rails->>DB: Follow.delete(follower: UserA, following: UserB)
    Rails->>Job: perform_later(UserA.id, UserB.id)
    Rails-->>UserA: redirect (アンフォロー完了)
    
    Note over Job,Redis: 非同期処理
    Job->>DB: UserB.articles.last(500).pluck(:id)
    DB-->>Job: [101, 102, 103]
    
    Job->>Redis: ZREM timeline:UserA.id 101
    Job->>Redis: ZREM timeline:UserA.id 102
    Job->>Redis: ZREM timeline:UserA.id 103
    Note over Redis: UserBの投稿を削除

具体例:

操作前 (UserA のタイムライン) 操作後 (UserA のタイムライン)
timeline:1 = [103, 102, 101, 50, 49, 48, …] timeline:1 = [50, 49, 48, …]

UserB の記事(ID: 101, 102, 103)がタイムラインから削除される。

タイムライン閲覧時

sequenceDiagram
    participant UserA as UserA
    participant React as React (Frontend)
    participant Rails as Rails Server
    participant Redis as Redis
    participant DB as PostgreSQL

    UserA->>React: タイムラインページ表示
    React->>Rails: GET /articles?page=1
    Rails->>Redis: ZREVRANGE timeline:UserA.id 0 20
    Redis-->>Rails: [103, 102, 101, 50, 49, ...]
    Rails->>DB: Article.where(id: [...]).includes(:user)
    DB-->>Rails: [Article#103, Article#102, ...]
    Rails-->>React: JSON {articles: [...], has_next_page: true}
    React-->>UserA: 記事一覧を表示

Redisから記事IDを取得し、その順序を維持したままDBから記事データを取得して返却する。

動きの様子

比較

「fan-out 戦略いいじゃん」と安易に飛びつかないため、比較をしてみる。

以下ソースは、タイムラインの取得処理を「見るとき集める」戦略で実装したものを用意し、データの作成までの時間を計測する。

app/controllers/articles_controller.rb(見るとき集める版)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ArticlesController < ApplicationController
before_action :set_current_user

def index
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

page = params[:page].to_i
page = 1 if page < 1
per_page = 20

following_ids = @current_user.following.pluck(:id)
articles = Article.includes(:user).where(user_id: following_ids).order(created_at: :desc).limit(per_page + 1).offset((page - 1) * per_page)

has_next_page = articles.size > per_page

articles_hash = articles.limit(per_page).map { |a| { id: a.id, comment: a.comment, created_at: a.created_at, user_name: a.user.name } }

total_time = ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time) * 1000).round(2)

REDIS.lpush("honesty_query_timeline_load_times", total_time)
REDIS.ltrim("honesty_query_timeline_load_times", 0, 99) # 最新100件
times = REDIS.lrange("honesty_query_timeline_load_times", 0, -1).map(&:to_f)

avg_time = (times.sum / times.size).round(2)
max_time = times.max.round(2)
min_time = times.min.round(2)
p "[ArticlesController#index] 件数: #{times.size}, Avg: #{avg_time}ms, Max: #{max_time}ms, Min: #{min_time}ms"


render json: {
articles: articles_hash,
has_next_page: has_next_page,
next_page: has_next_page ? page + 1 : nil
}
end

# 省略
end
app/controllers/articles_controller.rb(fan-out戦略版)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class ArticlesController < ApplicationController
before_action :set_current_user

def index
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

page = params[:page].to_i
page = 1 if page < 1
per_page = 20

# Redisから(per_page + 1)件取得してhas_next_pageを判定
ids = REDIS.zrevrange("timeline:#{@current_user.id}", (page - 1) * per_page, page * per_page).map(&:to_i)

has_next_page = ids.size > per_page
ids = ids.first(per_page)

# Redisの順序を維持するためindex_byを使用
articles_by_id = Article.where(id: ids).includes(:user).index_by(&:id)
articles = ids.map { |id| articles_by_id[id] }.compact
articles_hash = articles.map { |a| { id: a.id, comment: a.comment, created_at: a.created_at, user_name: a.user.name } }

total_time = ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time) * 1000).round(2)

REDIS.lpush("fan-out_timeline_load_times", total_time)
REDIS.ltrim("fan-out_timeline_load_times", 0, 99) # 最新100件
times = REDIS.lrange("fan-out_timeline_load_times", 0, -1).map(&:to_f)

avg_time = (times.sum / times.size).round(2)
max_time = times.max.round(2)
min_time = times.min.round(2)
p "[ArticlesController#index] 件数: #{times.size}, Avg: #{avg_time}ms, Max: #{max_time}ms, Min: #{min_time}ms"


render json: {
articles: articles_hash,
has_next_page: has_next_page,
next_page: has_next_page ? page + 1 : nil
}
end

# 省略
end

実行時に10数件フォローをしてから実行した。
動かしてみると、以下のような結果になった。

戦略 平均ロード時間 最大ロード時間 最小ロード時間
見るとき集める 30.6ms 74.59ms 23.92ms
fan-out戦略 11.84ms 37.51ms 7.94ms
1
2
3
4
5
見るとき集める
"[ArticlesController#index] 件数: 100, Avg: 30.6ms, Max: 74.59ms, Min: 23.92ms"

fan-out戦略
"[ArticlesController#index] 件数: 100, Avg: 11.84ms, Max: 37.51ms, Min: 7.94ms"

ローカルで実行というものだけど、平均で約3倍、最大で約2倍の差が出ている。
データ量・フォロー対象者が増えれば、もっと差は出る可能性がある。


fan-out戦略を試してみた。
比較してみると、確かに見るとき集める戦略よりも高速にタイムラインを表示できている。

ただし、Twitterがこの戦略にたどり着くまでに、いろいろとためしていてたどりついたもので、第一選択肢かといえば疑問符がつく。

「同じようなタイムラインでも流れ自体がそもそも遅い」などあれば、ある程度単純にキャッシュすれば満足できる可能性もある。

では。