こんにちは、k_oomoriです。突然ですが、アクセスログを記録するにあたり、以下の要件を満たすデータベースが必要になりました。
- ユーザを区別するID(uid)、アクセス時刻、アクセスURL、その他必要に応じて追加される属性を記録する
- 特定のuidに対して過去のアクセスログをまとめて取得できる
- 過去400日(約13か月分)のログを蓄積できる
今回は、AerospikeとDynamoDBを比較検討しましたのでその結果をまとめようと思います。
DynamoDB
Amazon DynamoDBとは、AWSによって提供されているフルマネージドNoSQLデータベースサービスです*1。DynamoDBではデータは「テーブル」という単位に保存し、テーブルには検索のためのプライマリーキー(PK)が必要となります。DynamoDBのPKの付け方には2種類あって、一つは「ハッシュキー」、もう一つは「ハッシュキー+レンジキー」と呼ばれます*2。ハッシュキーはRDBにおける単一カラムに対するPKのようなもので、KVS的な使い方をするときに使います。もちろんハッシュキーの値に重複は許されません。対してハッシュキー+レンジキーはRDBの複合プライマリーキーのようなもので、こちらはレンジキーの値が違えばハッシュキーの値が同じであっても別itemとして登録できます。今回の要件に対してはハッシュキー=uid、レンジキー=アクセス時刻とするのがよさそうですので、次のようなテーブル設計で進めます。
uid(ハッシュキー) | アクセス時刻(レンジキー) | アクセスURL |
---|---|---|
A | 2016-07-01 00:00:00 | http://blog.nex8.net/basicknowledge-for-ec-marketer/ |
B | 2016-07-01 00:30:00 | https://www.nex8.net/ |
A | 2016-07-01 01:00:00 | http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/ |
B | 2016-07-01 01:30:00 | http://tech-blog.fancs.com/ |
A | 2016-07-01 02:00:00 | http://blog.nex8.net/five-merit/ |
ここではScalaからAWS SDK for Javaを使ってデータを取得するコーディング例を紹介します。
import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.regions.{Region, Regions} import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.model._ import java.util.HashMap import scala.collection.JavaConversions._ //DynamoDBクライアントの生成 val dynamoClient = new AmazonDynamoDBClient( new BasicAWSCredentials("myAccessKey", "mySecretKey") ) dynamoClient.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1)) //データの登録 val key = new HashMap[String, AttributeValue] key.put("uid", new AttributeValue().withS("A")) key.put("time", new AttributeValue().withS("2016-07-01 00:00:00")) val updateItems = new HashMap[String, AttributeValueUpdate] updateItems.put("url", new AttributeValueUpdate().withValue( new AttributeValue().withS("http://blog.nex8.net/basicknowledge-for-ec-marketer/")) ) val updateItemRequest = new UpdateItemRequest().withTableName("access_log") .withKey(key).withAttributeUpdates(updateItems) dynamoClient.updateItem(updateItemRequest) //長いのでその他のデータの登録はここでは省略します… //取得クエリ val keyConditions = new HashMap[String, Condition]() keyConditions.put("uid", new Condition().withComparisonOperator(ComparisonOperator.EQ.toString) .withAttributeValueList(new AttributeValue().withS("A")) ) keyConditions.put("time", new Condition().withComparisonOperator(ComparisonOperator.BETWEEN.toString) .withAttributeValueList( new AttributeValue().withS("2016-07-01 00:20:00"), new AttributeValue().withS("2016-07-02 03:00:00") ) ) val queryRequest = new QueryRequest().withTableName("access_log") .withKeyConditions(keyConditions) .withScanIndexForward(false) //降順 .withAttributesToGet("uid", "time", "url") val queryResult = dynamoClient.query(queryRequest) //mutable.Buffer[mutable.Map[String, AttributeValue]]で返ってくるので //List[Map[String, String]]型に変換 val scalaized = (queryResult.getItems map { _.map { case (k, v) => (k, v.getS) }.toMap}).toList println(scalaized) //List( // Map(url -> http://blog.nex8.net/five-merit/, uid -> A, time -> 2016-07-01 02:00:00), // Map(url -> http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/, uid -> A, time -> 2016-07-01 01:00:00) //)
Scalaと言いましたが実際はほとんどJavaですね(笑)。
それではパフォーマンスの検証をしましょう。今回はURLが短い(http:// +ドメインのみ)ものと長い(2kBほど)もので、クエリで100件、1,000件、10,000件、100,000件ヒットするようなテストデータを用意し、取得にかかる時間を計測したところ、結果は以下のようになりました。時間の単位はミリ秒です。またこの値は計測を5回行いその平均値としています。
件数 | short URL | long URL |
---|---|---|
100 | 24 ms | 18 ms |
1,000 | 57 ms | 183 ms |
10,000 | 557 ms | 1,911 ms |
100,000 | 6,435 ms | 16,341 ms |
DynamoDBにはprovisioned throughput capacityの設定がありますが、これは並列アクセスをどのくらい捌けるかという量なので、今回のような1発のクエリが重い場合にはcapacityを上げても速くなりません(実際にスループットを10倍にして試してみましたが、ほぼ変わらずでした)。逆にcapacityの設定が少なすぎるとProvisionedThroughputExceededExceptionが発生します。。。
DynamoDBの利点としては、フルマネージドのためサーバ管理の必要がない点や、データ量が多くなっても実質的にほぼ無尽蔵にデータを溜められることなどが挙げられます。データ量に応じて課金はされますが、1TBのデータを入れても月に$285(@東京リージョン)と、比較的お財布にやさしい金額設定ではないでしょうか。
Aerospike
AerospikeとはNoSQLに分類される分散KVSです。高速なKVSといえばMemcachedやRedisが有名ですが、これらはデータをメモリ上に保持するため長期間のアクセスログを保存するといったような用途には適していません。それに対してAerospikeではアクセスの高速性を保ちつつSSD上にデータを保持することができるため、大容量データに対応できます。
このブログでもyu_ishikawaさんがAerospikeに関する解説記事を書いてくれていますので、あまり馴染みがないという方はこちらを参照してみてください。
List型
AerospikeにはListsというデータ型が存在し、1つのrecordの中に複数のデータを持つことができます。よって表1に相当するものはAerospikeでは次のように表現できます。
uid | ログ |
---|---|
A | {time:"2016-07-01 00:00:00", url:"http://blog.nex8.net/basicknowledge-for-ec-marketer/"} |
{time:"2016-07-01 01:00:00", url:"http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/"} | |
{time:"2016-07-01 02:00:00", url:"http://blog.nex8.net/five-merit/"} | |
B | {time:"2016-07-01 00:30:00", url:"https://www.nex8.net/"} |
{time:"2016-07-01 01:30:00", url:"http://tech-blog.fancs.com/"} |
ではAerospike Java clientを用いてScalaから操作してみましょう。
import com.aerospike.client.{AerospikeClient, Key} import com.aerospike.client.cdt.ListOperation import com.aerospike.client.policy.{BatchPolicy, ClientPolicy, WritePolicy} import com.aerospike.client.Value.MapValue import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ //Aerospikeクライアントの生成(3000はポート番号) val client = new AerospikeClient(new ClientPolicy, "hostname", 3000) //データの登録 val binName = "log" val policy = new WritePolicy(client.writePolicyDefault) policy.expiration = 3600 //レコードをexpireさせたい場合[秒] //↓登録データにキー自体を含めたい場合 //↓(Aerospikeの内部ではキーのハッシュ値が使用され、 //↓明示的に指定しない限りキーの値は保存されない) policy.sendKey = true val keyA = new Key("namespace", "access_log", "A") val logsA = List( Map("time" -> "2016-07-01 00:00:00", "url" -> "http://blog.nex8.net/basicknowledge-for-ec-marketer/"), Map("time" -> "2016-07-01 01:00:00", "url" -> "http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/"), Map("time" -> "2016-07-01 02:00:00", "url" -> "http://blog.nex8.net/five-merit/") ).map(new MapValue(_)) client.operate(policy, keyA, ListOperation.insertItems(binName, 0, logsA))//0はListのindexを指定 val keyB = new Key("namespace", "access_log", "B") val logsB = List( Map("time" -> "2016-07-01 00:30:00", "url" -> "https://www.nex8.net/"), Map("time" -> "2016-07-01 01:30:00", "url" -> "http://tech-blog.fancs.com/") ).map(new MapValue(_)) client.operate(policy, keyB, ListOperation.insertItems(binName, 0, logsB)) //取得クエリ client.get(new BatchPolicy, Array(keyA, keyB), binName) foreach { x => val result: List[Map[String, String]] = x.getList(binName).map{ _.asInstanceOf[java.util.HashMap[String, String]].asScala.toMap }.toList println(result) //List( // Map(time -> 2016-07-01 00:00:00, url -> http://blog.nex8.net/basicknowledge-for-ec-marketer/), // Map(time -> 2016-07-01 01:00:00, url -> http://blog.nex8.net/tags-and-cookies-operated-the-retargeting/), // Map(time -> 2016-07-01 02:00:00, url -> http://blog.nex8.net/five-merit/), //) //List( // Map(time -> 2016-07-01 00:30:00, url -> https://www.nex8.net/), // Map(time -> 2016-07-01 01:30:00, url -> http://tech-blog.fancs.com/) //) } client.close()
しかし、Aerospikeには1レコードの最大容量が1MBまでという制限があるため、このList型の中に全てのログを保存することはできません。解決策としてはuidをキーにするのではなく、uid+日付などデータが1MBに収まるようにキーを細かく分けて保存する方法が挙げられます。
パフォーマンスの検証結果は以下のようになりました。上記容量制限のため、1つのキーに短いURLでも1000件、長いURLだと100件くらいまでしかログデータが入りませんでしたので、より多いログ件数を実現するためにキーを分割しました。(short URLの100,000件=1,000件×100キー、long URLの100,000件=100件×1,000キーなど)なお、検証に使用したサーバはAWSのm3.medium×3台でクラスタを組んだものになります。
件数 | short URL | long URL |
---|---|---|
100 | 12 ms | 14 ms |
1,000 | 47 ms | 42 ms |
10,000 | 54 ms | 249 ms |
100,000 | 225 ms | 3,771 ms |
1,000,000 | 2,756 ms | 未検証 |
Large Ordered Lists (LList)型
AerospikeにはListsとは別にLarge Ordered Lists (LList)型という容量1MB制限を突破できる特別なデータ型があり、これを使えば表2のデータ構造がそのまま実現できるのですが、一つのキーに巨大データを入れるというのはそもそも分散KVSの思想に沿っていないということですでに非推奨になっています。が、まだ利用可能なのでとりあえずやってみました。
import com.aerospike.client.policy.{ClientPolicy, WritePolicy} import com.aerospike.client.{AerospikeClient, Key, Value} import scala.collection.JavaConversions._ val client = new AerospikeClient(new ClientPolicy, "hostname", 3000) val policy = new WritePolicy policy.timeout = 1000000 policy.sendKey = true val key = new Key("namespace", "access_log", "A") val binName = "log" val llist = client.getLargeList(policy, key, binName) //データ書き込み val map = new JHMap[String, Any]() //"key"が必須!これでソートされているらしい。 http://www.aerospike.com/docs/guide/llist.html map.put("key", 1464833977000L) map.put("url", "http://blog.nex8.net/basicknowledge-for-ec-marketer/") llist.add(Value.get(map)) //データ取得 val list = llist.findLast(10000)//keyの新しい方から1万件取得 val res = (list map { x => mapAsScalaMap(x.asInstanceOf[java.util.HashMap[String, Any]]).toMap }).toList println(res) //List(Map(url -> http://blog.nex8.net/basicknowledge-for-ec-marketer/, key -> 1464833977000))
パフォーマンスは以下のようになりました。
件数 | short URL | long URL |
---|---|---|
100 | 2 ms | 4 ms |
1,000 | 10 ms | 30 ms |
10,000 | 121 ms | 280 ms |
100,000 | 864 ms | 4,658 ms |
まとめ
本記事ではアクセスログを保存するという観点で、DynamoDBとAerospikeを操作するScalaコードの紹介と、パフォーマンスの計測を行いました。測定結果を図にまとめると以下のようになります。
両対数グラフで傾きはほぼ1の直線になっているので、データ量に比例して取得にかかる時間が長くなるということがわかります。速さではAerospikeでList型を使うのが勝っていましたが、DynamoDBの場合は(お金さえ払えば)AWSがよしなに管理してくれるという利点がある(Aerospikeでは容量が足りなくなってサーバを追加したりサーバが故障したりした際にマイグレーション(クラスタ内でのデータの再分配)が起こったりして管理が結構大変…)ので、メリット・デメリットを把握した上でどのデータベースを使うのかを決めるといいと思います。