The Challenge(チャレンジ)

個人的にチャレンジは好きです。Instacartはいろいろな面白いチャレンジがあります。私の仕事はShopper Successチームでスムーズなcustomersとshoppers関係を維持することです。

主なタスクは消費者とshoppers両方に安定した供給を提供することです。その中で重要なタスクはfulfillment systems (フルフィルメント システム;受注管理システム)の安定性を管理、維持することです。安定したシステムがあってこそ、shoppersはユーザーに良いサビースを提供できます。Instacartのデータベースには多くのデータがあります。しかも成長するにつれて急速に増えていきます。コードはいろんなアプリやサービスで動作しますが、また二つのデータベースしか持っていません。primary と item catalogがこの二つです。すぐに容量は足りなくなりました。

Objectives(目標、目的)

アーキテクチャーの第一歩は、常に最適化しているものを見つけることです。私たちはPostgresで大きな成功を収め、変更する必要はないです。性能はかなり高く、datatypes とquery functionsも非常に使いやすいです。

Space(スペース:容量) 

私たちの最も緊急な優先事項は、より多くのスペースを見つけることでした。私たちは時間稼ぎをしつつ、Amazon RDSの6TB制限(終了前に12TBに引き上げられました)に向き合わなければなりませんでした。私たちは幾つかのイニシアチブはありましたが、この中で少なくとも問題を解決できるように望んでいました。

Isolation(分離)

システムの堅牢性を向上させる1つの方法は、システムを分離させ、インシデントの頻度と影響の幅を制限することです。ここで少し説明したいことがあります。私たちは、食料品を選んで顧客に提供するshoppersと顧客に従事しますが、私たちはまた、小売業者や消費者包装商品会社と緊密な協力関係にあります。ビジネスの各分野には、ニーズに応えるために異なるSLA(サービス品質保証契約)が用意されています。私たちの買い物客には、高い安定性を提供しなければなりません。当社のフルフィルメント(shopper)システムがダウンした場合、当社は発注することはできませんが、結局一番、Instacartに頼る人たちを収入面で失望させることです。

Performance(処理能力) 

もちろん高い性能(Performance)を維持するのは大切なゴールです。しかし、この段階でPerformanceを激変するのは望んでいませんでした。Replicasはほとんどのread loadは処理できますし、私たちのwrite loadもまだ対応できました。

Application-Level Asynchronous Data Pump with Multiple Postgres Databases(複数のPostgresデータベースを持つアプリケーションレベルの非同期Data Pump)

私たちが持っている何百ものテーブル(図)を調べた結果、複数のドメインで必要とされたのはわずか数十でした。これらのテーブルには、多くの場合、APIを不適切なものにする多数の結合や複雑なクエリがあります。たとえば、顧客が注文すると、その注文はCustomersドメインのCustomersデータベースに書き込まれます。顧客データベースには、顧客が注文にアクセスして変更するために必要な注文が書き込まれます。ただし、この注文に関するデータの一部は、フルフィルメントドメインにとって重要です。分離を提供するためには、データをどこかにコピーする必要がありました。 Postgresは非常に強力で、特にAuroraのような水平方向のオプションを使用すると、それから離れていく理由は見当たりませんでした。注文を完了し、データベース間で必要なデータを共有するのに必要なすべてのデータを格納する専用のフルフィルメントデータベースを作成することにしました。これにより、完全な分離と関連するデータのより大きな所有権が可能になります。

Multiple Databases(複数データベース)

Railsで簡単に複数のデータベースに接続できます。

class FulfillmentRecord < ActiveRecord::Base
  self.abstract_class = true
  establish_connection "fulfillment_#{Rails.env}".to_sym
end
module FulfillmentRecordConnection
  extend ActiveSupport::Concern
  included do
    def self.retrieve_connection
      FulfillmentRecord.connection
    end
  end
end
class SomeFulfillmentModel < FulfillmentRecord
    # Connects to Fulfillment database
end
class SomeCustomersModel < CustomersRecord
    # Connects to Customers database
end
class Fulfillment::SomeCustomersModel < SomeCustomersModel
  include FulfillmentRecordConnection
  # Inherits from the customers model, but connects to the 
  # fulfillment database
end

上記の例は、ActiveRecordで異なる接続を持つモデルを1つのアプリケーション内に持つ方法を示しています。include FulfillmentRecordConnectionこの指令でsubclassの接続を変更し、parent class や reopened classから全ての機能を受け継ぎます。

Master & Copied Tables(マスターテーブルとコピーテーブル)

複数の場所(データーベース)でデータを管理する場合、データが同期し、分岐しないようにする必要があります。ある場所に適用された変更は、他の場所で適用する必要があります。後ほどもっと詳しく説明します。

複数のドメインで必要とされるすべてのテーブル(図)を調べ、そのほとんどが単一のドメインからのすべての書き込みを持っていることがわかりました。
ほとんどのテーブルのwritesは単一ドメインから現れます。私たちにとってこれはいい兆候です。なぜかというと「schemaは実は間違いでした」と示したからです。そしてテーブルを別のエンティティに分割するときが来たことを示すよい指標となりました。なぜならmaster-slave synchronizationはmaster-masterより簡単であるため、これは特に便利でした。

共有テーブルごとに、テーブルと書き込みが存在する所有ドメインを決定しました。このテーブルを必要とする他のドメインには、“copied table.”と呼ばれるものがあります。replicas やslavesと呼ばないよう注意しました。私たちのコピーされたテーブルは、Postgresのプロダクションユーザーが読み取り専用にして、偶発的な変更や基本的なアプリケーションレベルの書き込み保護を防ぎます。ここにCopiedTableの単純化された実装があります。

module CopiedTable
  extend ActiveSupport::Concern
  def readonly?
    !@force_write && !new_record?
  end
  INSTANCE_FORCE_ACTIONS = [:update, :update!, :destroy, :destroy!, :save, :save!, :update_columns]
  INSTANCE_FORCE_ACTIONS.each do |action|
    define_method("force_#{action}") do |*args, &block|
      @force_write = true
      result = send("#{action}", *args, &block)
      @force_write = false
      result
    end
  end
  included do
    def apply_published_changes(params)
      id = params[:id]
      attrs = filtered_attributes(params[:attrs])
      action = params[:action] # create, update, or delete
      # Do (force_)action with attrs
    end
    def filtered_attributes(attributes)
      # Removes attributes that do not exist on the copied table
      present = column_names.each_with_object({}) { |key, hash| hash[key.to_sym] = true }
      attributes.select { |key, v| present[key.to_sym] }
    end
  end
end

簡単に言うと、不注意な修正を防止し一般的なペイロードの変更を簡単にして適応します。これを使用するならCopiedTableをモデル(コピーバージョン)に含めます。

Application-Level Data Pump(アプリケーションレベルの Data Pump)

Postgresには、データの共有と複製のためのデータベースレベルのメカニズムがいくつかあります。レプリカは読み込み負荷を分離するのに最適ですが、スペースや書き込み分離に役立ちません。外部データをキャッシュするために、外部データ・ラッパー(FDW)とマテリアライズド・ビューを組み合わせて実験しました。実験は成功しませんでしたが、マテリアライズド・ビューのリフレッシュが大規模に問題がある可能性があることがわかりました。

アプリケーションレベルで複製することで、より大きな論理制御が可能になります。 2つの異なるアプリが異なるデータを気にすることがあります。おそらくCustomersドメインはすべての注文のフルレコードを必要としますが、フルフィルメントドメインは現在の注文を提供するための情報しか必要としません。フルフィルメントドメインは、過去の注文に対する更新情報またはクーポンが使用された情報を無視することができます。パフォーマンスのオーバーヘッドは、私たちにとって理にかなったトレードオフでした。

これを容易にするため、Hubにチェンジします。include PublishChanges を指令すればできます。

module PublishChanges
  extend ActiveSupport::Concern
  included do
    after_commit :publish_changes
  end
  def hub_publish_changes
    # Publish data for subscriber
    #   "#{model_name.underscore}_changed", e.g. "shopper_changed"
    #   type:    model_name, e.g. Shopper
    #   id:      self.id,
    #   action:  action, e.g. :create
    #   changes: transformed_changes, e.g. { foo: :new_value }
    # Implementation not shown
  end
end

次に、更新を受け取るためのCopiedTableモジュールが用意されています。コピーされたテーブルのこのスニペットはapply_published_changesクラスメソッドを示しています。これにより、パブリッシュコンシューマはModel.apply_published_changes(params)を呼び出して、コピーされたテーブルを作成、更新、または削除します。消費者は、ドメインが気にしない行(たとえば過去の注文合計)を無視することができます。(例:昔の注文合計)。

module CopiedTable   extend ActiveSupport::Concern

  included do
    def apply_published_changes(params)
      id = params[:id]
      attrs = filtered_attributes(params[:attrs])
      action = params[:action]
      # Do action with attrs
    end
  def filtered_attributes(attributes)
      # Removes attributes that do not exist on the copied table
      present = column_names.each_with_object({}) { |key, hash| hash[key.to_sym] = true }
      attributes.select { |key, v| present[key.to_sym] }
    end
  end
end

Eventual Consistency(最終的な一貫性)

最終的な一貫性は、基本的に複製されたデータソースが最終的に元のデータソースにあるすべてのデータを保持することを意味する明確な概念ですが、おそらく遅延が存在します。これは、読み取りと書き込みの両方の高性能を可能にする一般的なトレードオフです。 2つのデータベースがアトミックに結合されている場合、いずれかのパフォーマンスが遅いと、どちらか一方に悪影響が及ぼされます。このパターンでユーザーの行動はFulfillment domainに影響をされません。

Verifying Integrity(整合性の検証)

常にデータが同期しなくなるか、大幅に後退するリスクがあります。バグがパブリッシングの変更を妨げるか、電源コードにつまづく場合は、スイーパーとの違いを特定して解決する必要があります。コピーされたモデルごとに定期的にスケジュールされたジョブが実行され、すべての更新が確実に行われます。最初のデータベースは、習慣から索引をつけたupdated_atと、最初のデータベースのIDで2番目のデータベースを照会できます。バッチで実施する場合、このアプローチはうまくいっています。

Refactoring Data(リファクタリング データ)

素晴らしい! 私たちは新しいアーキテクチャを意識していますが、単にいくつかのPRを結合して、ダウンタイム、エラー、またはデータの損失なしにテラバイトのデータを移動させることはできません。ボリューム、アクセスパターン、および機密性によって、すべてのデータ移行が異なります。何百ものテーブルを移動した後、私たちはライブシステムに悪影響を与えずにデータを移動するために効果的なプロセスを開発しました。

Identify dependencies(依存関係を特定する)

テーブルを移動する前に、移動する必要があるすべての結合、トリガー、その他の関連するエンティティを見つけることが重要です。この作業についてはコツがあります。まず、コードベースのモデル名とテーブル名の両方を検索し、簡単に見つけられるものを移動しました。次に、ブランチでモデルとテーブルを削除して移動したかったので、私が逃したクエリをテストで見つけることができました。最後に、プロファイリングツールとログを使用して実行されたクエリを調べました。

Consolidate writes(書き込みを統合)

ほとんどのテーブルの初期状態は、特定のテーブルの単一ドメインのデータベースを読み書きする複数のドメインでした。 信頼性の高いレプリケーションと検証を使用して明確に定義されたアクセスパターンを得るために、単一のドメインでそのテーブルを所有したかったのです。 このコードを移動すると、古いモノリス(一本柱)から残ったコードやデッドコードを見つけることができました。 どのような場合でも、複数のドメインが同じテーブルに書き込んでいるときは、コードの匂い(深刻な問題が存在することを示す何らかの兆候)であり、それらのモデルを分割またはリファクタリングすることに決めました。

Establish Models(モデルを確立する)

次は新たなデーターベースでschema とmodelを作成します。structure.sql or schema.rbをコピーするのはいいスタートポイントです。また、古いデータや不正なデータをクエリする人がいるかもしれません。元のテーブルにあったものをコピーするために使用された検証のないモデルを作成するのが最も効果的だとわかりました。

class Foobar < ActiveRecord::Base
  # Existing full model definition
  # ...
end
class Fulfillment::Foobar < FulfillmentRecord
  include CopiedTable
  # Empty model, except for serialized fields
  serialize :data
end

Backfill data(バックフィルデータ)

私たちのエンジニアの一人は、postgresデータベース間でデータを転送するためのpgsyncという素晴らしいツールを開発しました。われわれは、難読化されたステージングデータをローカルの開発環境に引き渡すために最も頻繁に使用しますが、本番データベース間でも機能します。どれだけのデータを移動しているかによって、時間がかかることがあります。おそらく、いつ実行したのかを記録しておくと、その時点から将来のアップデートを同期することができます。

pgsync the_table --in-batches --from $DATABASE_1_URL --to $DATABASE_2_URL --sleep 0.1 --to-safe

Synchronous Dual-write(同期デュアルライト)

“cutover,” の準備が整いますので、テーブルが同期していることが重要です。 “cutover,” して1分の遅れがある場合、そのデータを失うリスクがあります。確かに、予防する方法はありますが、競争条件に起因する問題を軽減する簡単な方法は、本当に短い競合をすることです。この設定は、コールバック内の2番目のデータベースに書き込んで最初のデータベースに書き込みます。

module DualWrite
  extend ActiveSupport::Concern
  class_methods do
    def dual_write(to:)
      self.to_model = to
    end
  end
  included do
    cattr_accessor :to_model
    after_save :dual_write_to_db
    after_destroy :dual_destroy_from_db
  end
  protected
  def prepare_attributes
    present = Set.new(to_model.constantize.column_names)
    attributes.select { |key, v| present.include?(key) }
  end
  def dual_write_to_db
    return unless enabled?
    r = to_model.constantize.find_or_initialize_by(id: id)
    r.assign_attributes(prepare_attributes)
    r.respond_to?(:force_save!) ? r.force_save!(validate: false) : r.save!(validate: false)
  end
  def dual_destroy_from_db
    return unless enabled?
    r = to_model.constantize.find_by(id: id)
    return unless r
    r.respond_to?(:force_destroy) ? r.force_destroy : r.destroy
  end
  def enabled?
    to_model && self.class.to_s != to_model
  end
end

注意:self.attributesはprimary keyが含まれています。だからこそauto-incrementを増さ(incrementing)ずに新たな列が作れます。これは下記のように手動でできます:

Fulfillment::Foobar.connection.execute("SELECT SETVAL('foobar_id_seq', COALESCE(MAX(id), 1) ) FROM foobar;")

Cutover(新たにシステムが稼動を開始)

我々は万全の準備をし、新たに飛躍します。モデルのデータベースを変更し、古いデータベースの新しいモデルを作成し、もう一方の方向を二重書きする単一のPRを作成します。cutoverが有効になる直前にsweeperを無効にし、直後にもう一方の方向に再び有効にする必要があります。私は、クリーンロールバックを容易にするために、cutover時に同期をとることをお勧めします。両方のデータベースを照会し、データが良好であるかどうかを確認します。私が見つけたシンプルなクエリは下記になります:

SELECT COUNT(id), AVG(id) FROM foo_bar WHERE updated_at < now() - INTERVAL '10 seconds'

これは情報源を確認する時と同じ列が持ってるtablesをコピーする時にかなり役に立ちます。

Asynchronous Replication(非同期レプリケーション)

cutoverがスムーズに行われ、ロールバックする必要がないという確信が得られたら、同期を非同期にするときです。そのため、分離の実質的な利点を得ることができます。同期デュアル書き込みを削除する前に、パブリッシュと変更の使用を開始します。いったん非同期で書いたら、2つのデータベースを切り離しました! 第1データベースへの書き込みは、正常な状態にある第2データベースに依存せず、読み取りはローカルデータベース上にあります。

Dropping Old Tables(古いテーブルの削除)

場合によっては、古いデータベースのテーブルが必要なくなりました。古いtablesを削除するのはリスク(ロシアンルーレットのような)がありますが、ここにもコツがあります。

記録する(Archive)ことはいい方法です。もちろん間違えることはあまり気にしなくても大丈夫です。あなたが間違い、失敗したクエリを許容できれば、権限の取り消しはテーブルを使用していないことを確認する実質的な方法となります。私の場合なら、すぐ復元するために、いつもtext editorでgrant statementを書きます。もし今使ってるシステムがfailing queriesに対応できないなら、ちゃんとtableの統計値を保存しましょう。難しいですけど、安全なやり方です。

そして追跡(tracking)の機能をオンして、tableをリセットして、待ちます。access countsはゼロになったら大丈夫です。さらに詳しい説明はこちらです:Postgres docs.

Conclusion(結論)

Instacartには多くのデータがあり、データベース間でデータを複製するためのアプリケーションレベルのdata pump patternが採用されています。このアプローチは、すべてのテーブルで機能するように一般的ですが、非正規化、フィルタリング、およびその他のパフォーマンスの最適化を可能にするほどにカスタマイズ可能です。

すべての書き込みを最終的な所有ドメインに移動し、両方のデータベースに同時にデュアル書き込みし、truthのソースを切り取り、非同期に複製することから始めます。

もし私の経験がどこかで役に立てれば嬉しいです。もし君が私たちのサービスについて新しいアイディアがあれば、是非Instacartに応募してください!

タイトル:Scaling at Instacart: Distributing Data Across Multiple Postgres Databases with Rails

作者:Doug Hyde

原文URL:https://tech.instacart.com/scaling-at-instacart-distributing-data-across-multiple-postgres-databases-with-rails-13b1e4eba202

今すぐシェアしよう!
今すぐシェアしよう!