FANCOMI Ad-Tech Blog

株式会社ファンコミュニケーションズ nend・新規事業のエンジニア・技術ブログ

アクセスログの残しかた ~ Aerospike×DynamoDB

 こんにちは、k_oomoriです。突然ですが、アクセスログを記録するにあたり、以下の要件を満たすデータベースが必要になりました。

  • ユーザを区別するID(uid)、アクセス時刻、アクセスURL、その他必要に応じて追加される属性を記録する
  • 特定のuidに対して過去のアクセスログをまとめて取得できる
  • 過去400日(約13か月分)のログを蓄積できる

今回は、AerospikeとDynamoDBを比較検討しましたのでその結果をまとめようと思います。

DynamoDB

 Amazon DynamoDBとは、AWSによって提供されているフルマネージドNoSQLデータベースサービスです*1。DynamoDBではデータは「テーブル」という単位に保存し、テーブルには検索のためのプライマリーキー(PK)が必要となります。DynamoDBのPKの付け方には2種類あって、一つは「ハッシュキー」、もう一つは「ハッシュキー+レンジキー」と呼ばれます*2。ハッシュキーはRDBにおける単一カラムに対するPKのようなもので、KVS的な使い方をするときに使います。もちろんハッシュキーの値に重複は許されません。対してハッシュキー+レンジキーはRDBの複合プライマリーキーのようなもので、こちらはレンジキーの値が違えばハッシュキーの値が同じであっても別itemとして登録できます。今回の要件に対してはハッシュキー=uid、レンジキー=アクセス時刻とするのがよさそうですので、次のようなテーブル設計で進めます。

uid(ハッシュキー)アクセス時刻(レンジキー)アクセスURL
A2016-07-01 00:00:00http://blog.nex8.net/basicknowledge-for-ec-marketer/
B2016-07-01 00:30:00https://www.nex8.net/
A2016-07-01 01:00:00http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/
B2016-07-01 01:30:00http://tech-blog.fancs.com/
A2016-07-01 02:00:00http://blog.nex8.net/five-merit/
表1. アクセスログのDynamoDBへの保存方法

 ここではScalaからAWS SDK for Javaを使ってデータを取得するコーディング例を紹介します。

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.model._
import java.util.HashMap
import scala.collection.JavaConversions._

//DynamoDBクライアントの生成
val dynamoClient = new AmazonDynamoDBClient(
  new BasicAWSCredentials("myAccessKey", "mySecretKey")
)
dynamoClient.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1))

//データの登録
val key = new HashMap[String, AttributeValue]
key.put("uid", new AttributeValue().withS("A"))
key.put("time", new AttributeValue().withS("2016-07-01 00:00:00"))
val updateItems = new HashMap[String, AttributeValueUpdate]
updateItems.put("url", new AttributeValueUpdate().withValue(
  new AttributeValue().withS("http://blog.nex8.net/basicknowledge-for-ec-marketer/"))
)
val updateItemRequest = new UpdateItemRequest().withTableName("access_log")
  .withKey(key).withAttributeUpdates(updateItems)
dynamoClient.updateItem(updateItemRequest)
//長いのでその他のデータの登録はここでは省略します…

//取得クエリ
val keyConditions = new HashMap[String, Condition]()
keyConditions.put("uid", 
  new Condition().withComparisonOperator(ComparisonOperator.EQ.toString)
    .withAttributeValueList(new AttributeValue().withS("A"))
)
keyConditions.put("time", 
  new Condition().withComparisonOperator(ComparisonOperator.BETWEEN.toString)
    .withAttributeValueList(
      new AttributeValue().withS("2016-07-01 00:20:00"), 
      new AttributeValue().withS("2016-07-02 03:00:00")
    )
)
val queryRequest = new QueryRequest().withTableName("access_log")
  .withKeyConditions(keyConditions)
  .withScanIndexForward(false) //降順
  .withAttributesToGet("uid", "time", "url")
val queryResult = dynamoClient.query(queryRequest)
//mutable.Buffer[mutable.Map[String, AttributeValue]]で返ってくるので
//List[Map[String, String]]型に変換
val scalaized = (queryResult.getItems map { _.map {
  case (k, v) => (k, v.getS)
}.toMap}).toList
println(scalaized)
//List(
//  Map(url -> http://blog.nex8.net/five-merit/, uid -> A, time -> 2016-07-01 02:00:00), 
//  Map(url -> http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/, uid -> A, time -> 2016-07-01 01:00:00)
//)

Scalaと言いましたが実際はほとんどJavaですね(笑)。
 それではパフォーマンスの検証をしましょう。今回はURLが短い(http:// +ドメインのみ)ものと長い(2kBほど)もので、クエリで100件、1,000件、10,000件、100,000件ヒットするようなテストデータを用意し、取得にかかる時間を計測したところ、結果は以下のようになりました。時間の単位はミリ秒です。またこの値は計測を5回行いその平均値としています。

件数short URLlong URL
10024 ms18 ms
1,00057 ms183 ms
10,000557 ms1,911 ms
100,0006,435 ms16,341 ms
100件くらいまでなら20ms前後で返ってきますが、件数に比例して時間が延びていく感じですね。1万件を越えたあたりから秒単位の時間がかかってしまうようになるので、リアルタイム処理に利用するのは現実的ではないかもしれません。
 DynamoDBにはprovisioned throughput capacityの設定がありますが、これは並列アクセスをどのくらい捌けるかという量なので、今回のような1発のクエリが重い場合にはcapacityを上げても速くなりません(実際にスループットを10倍にして試してみましたが、ほぼ変わらずでした)。逆にcapacityの設定が少なすぎるとProvisionedThroughputExceededExceptionが発生します。。。
 DynamoDBの利点としては、フルマネージドのためサーバ管理の必要がない点や、データ量が多くなっても実質的にほぼ無尽蔵にデータを溜められることなどが挙げられます。データ量に応じて課金はされますが、1TBのデータを入れても月に$285(@東京リージョン)と、比較的お財布にやさしい金額設定ではないでしょうか。

Aerospike

 AerospikeとはNoSQLに分類される分散KVSです。高速なKVSといえばMemcachedやRedisが有名ですが、これらはデータをメモリ上に保持するため長期間のアクセスログを保存するといったような用途には適していません。それに対してAerospikeではアクセスの高速性を保ちつつSSD上にデータを保持することができるため、大容量データに対応できます。
 このブログでもyu_ishikawaさんがAerospikeに関する解説記事を書いてくれていますので、あまり馴染みがないという方はこちらを参照してみてください。

List型

 AerospikeにはListsというデータ型が存在し、1つのrecordの中に複数のデータを持つことができます。よって表1に相当するものはAerospikeでは次のように表現できます。

uidログ
A{time:"2016-07-01 00:00:00", url:"http://blog.nex8.net/basicknowledge-for-ec-marketer/"}
{time:"2016-07-01 01:00:00", url:"http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/"}
{time:"2016-07-01 02:00:00", url:"http://blog.nex8.net/five-merit/"}
B{time:"2016-07-01 00:30:00", url:"https://www.nex8.net/"}
{time:"2016-07-01 01:30:00", url:"http://tech-blog.fancs.com/"}
表2. アクセスログのAerospikeへの保存方法

ではAerospike Java clientを用いてScalaから操作してみましょう。

import com.aerospike.client.{AerospikeClient, Key}
import com.aerospike.client.cdt.ListOperation
import com.aerospike.client.policy.{BatchPolicy, ClientPolicy, WritePolicy}
import com.aerospike.client.Value.MapValue
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

//Aerospikeクライアントの生成(3000はポート番号)
val client = new AerospikeClient(new ClientPolicy, "hostname", 3000)

//データの登録
val binName = "log"
val policy = new WritePolicy(client.writePolicyDefault)
policy.expiration = 3600 //レコードをexpireさせたい場合[秒]
//↓登録データにキー自体を含めたい場合
//↓(Aerospikeの内部ではキーのハッシュ値が使用され、
//↓明示的に指定しない限りキーの値は保存されない)
policy.sendKey = true
val keyA = new Key("namespace", "access_log", "A")
val logsA = List(
  Map("time" -> "2016-07-01 00:00:00", "url" -> "http://blog.nex8.net/basicknowledge-for-ec-marketer/"),
  Map("time" -> "2016-07-01 01:00:00", "url" -> "http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/"),
  Map("time" -> "2016-07-01 02:00:00", "url" -> "http://blog.nex8.net/five-merit/")
).map(new MapValue(_))
client.operate(policy, keyA, ListOperation.insertItems(binName, 0, logsA))//0はListのindexを指定
val keyB = new Key("namespace", "access_log", "B")
val logsB = List(
  Map("time" -> "2016-07-01 00:30:00", "url" -> "https://www.nex8.net/"),
  Map("time" -> "2016-07-01 01:30:00", "url" -> "http://tech-blog.fancs.com/")
).map(new MapValue(_))
client.operate(policy, keyB, ListOperation.insertItems(binName, 0, logsB))

//取得クエリ
client.get(new BatchPolicy, Array(keyA, keyB), binName) foreach { x =>
  val result: List[Map[String, String]] = 
    x.getList(binName).map{
      _.asInstanceOf[java.util.HashMap[String, String]].asScala.toMap
    }.toList
  println(result)
  //List(
  //  Map(time -> 2016-07-01 00:00:00, url -> http://blog.nex8.net/basicknowledge-for-ec-marketer/), 
  //  Map(time -> 2016-07-01 01:00:00, url -> http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/), 
  //  Map(time -> 2016-07-01 02:00:00, url -> http://blog.nex8.net/five-merit/), 
  //)
  //List(
  //  Map(time -> 2016-07-01 00:30:00, url -> https://www.nex8.net/), 
  //  Map(time -> 2016-07-01 01:30:00, url -> http://tech-blog.fancs.com/)
  //)
}
client.close()

しかし、Aerospikeには1レコードの最大容量が1MBまでという制限があるため、このList型の中に全てのログを保存することはできません。解決策としてはuidをキーにするのではなく、uid+日付などデータが1MBに収まるようにキーを細かく分けて保存する方法が挙げられます。
 パフォーマンスの検証結果は以下のようになりました。上記容量制限のため、1つのキーに短いURLでも1000件、長いURLだと100件くらいまでしかログデータが入りませんでしたので、より多いログ件数を実現するためにキーを分割しました。(short URLの100,000件=1,000件×100キー、long URLの100,000件=100件×1,000キーなど)なお、検証に使用したサーバはAWSのm3.medium×3台でクラスタを組んだものになります。

件数short URLlong URL
10012 ms14 ms
1,00047 ms42 ms
10,00054 ms249 ms
100,000225 ms3,771 ms
1,000,0002,756 ms未検証
複数のキーを指定してまとめてとってくる(BatchRead, 上記コード例ではArray(keyA, keyB)として2つまとめている)際に分散KVSのメリットが活かされるのか、同じ量のデータをDynamoDBから取ってくるのに比べて大幅に早くなっています!(最後のセクションで比較の図を用意しています)

Large Ordered Lists (LList)型

 AerospikeにはListsとは別にLarge Ordered Lists (LList)型という容量1MB制限を突破できる特別なデータ型があり、これを使えば表2のデータ構造がそのまま実現できるのですが、一つのキーに巨大データを入れるというのはそもそも分散KVSの思想に沿っていないということですでに非推奨になっています。が、まだ利用可能なのでとりあえずやってみました。

import com.aerospike.client.policy.{ClientPolicy, WritePolicy}
import com.aerospike.client.{AerospikeClient, Key, Value}
import scala.collection.JavaConversions._

val client = new AerospikeClient(new ClientPolicy, "hostname", 3000)
val policy = new WritePolicy
policy.timeout = 1000000
policy.sendKey = true
val key = new Key("namespace", "access_log", "A")
val binName = "log"
val llist = client.getLargeList(policy, key, binName)

//データ書き込み
val map = new JHMap[String, Any]()
//"key"が必須!これでソートされているらしい。 http://www.aerospike.com/docs/guide/llist.html
map.put("key", 1464833977000L)
map.put("url", "http://blog.nex8.net/basicknowledge-for-ec-marketer/")
llist.add(Value.get(map))

//データ取得
val list = llist.findLast(10000)//keyの新しい方から1万件取得
val res = (list map { x =>
  mapAsScalaMap(x.asInstanceOf[java.util.HashMap[String, Any]]).toMap
}).toList
println(res)
//List(Map(url -> http://blog.nex8.net/basicknowledge-for-ec-marketer/, key -> 1464833977000))

パフォーマンスは以下のようになりました。

件数short URLlong URL
1002 ms4 ms
1,00010 ms30 ms
10,000121 ms280 ms
100,000864 ms4,658 ms
まあ、deprecatedと言われているので、これから新たに開発しようとする場合は使用を避けた方がよいでしょう(笑)。

まとめ

 本記事ではアクセスログを保存するという観点で、DynamoDBとAerospikeを操作するScalaコードの紹介と、パフォーマンスの計測を行いました。測定結果を図にまとめると以下のようになります。

f:id:fan_k_oomori:20161227113727p:plain

両対数グラフで傾きはほぼ1の直線になっているので、データ量に比例して取得にかかる時間が長くなるということがわかります。速さではAerospikeでList型を使うのが勝っていましたが、DynamoDBの場合は(お金さえ払えば)AWSがよしなに管理してくれるという利点がある(Aerospikeでは容量が足りなくなってサーバを追加したりサーバが故障したりした際にマイグレーション(クラスタ内でのデータの再分配)が起こったりして管理が結構大変…)ので、メリット・デメリットを把握した上でどのデータベースを使うのかを決めるといいと思います。

*1:ここではDynamoDBの基本的な説明は割愛しますので、DynamoDBのことをあまり知らないという方はRef.[1-2]などを参照していただくとよいかと思います。

*2:詳細な説明はReferences [3-4]などをご参照ください。