embulkからAPACHE KAFKA にpushしてみた

embulkからAPACHE KAFKA にpushしてみた。

といっても、QUICKSTARTだけですが。

kafka.apache.org

基本は上記の通りやりました。

kafka_2.13-3.7.0.tgzをダウンロードしてdocker imageで展開して使ってみました。

root@05493a980797:/work/kafka_2.13-3.7.0# bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

QUICKSTART通り、でした。

ここからが本題です。

embulkを使ってkafkaにメッセージをpushしてみました。

randomjでランダムにデータを1000件登録します。

in:
  type: randomj
  rows: 1000
  threads: 1
  primary_key: myid
  schema:
    - {name: myid,     type: long}
    - {name: named,    type: string}
    - {name: named_s,  type: string, length: 8}
    - {name: x_flag,   type: boolean}
    - {name: rate,     type: double, max_value: 100, min_value: -100}
    - {name: score,    type: long, max_value: 255, min_value: 100}
    - {name: time,     type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y/%m/%d'}
out:
  type: kafka
  topic: "quickstart-events"
  serialize_format: json
  brokers:
    - "localhost:9092"

結果です。(一部省略)

{"myid":908,"named":"2Flfx52dngeMd8qXWd0HL74frAG8MUf4","named_s":"2UZSKL2H","x_flag":false,"rate":21.18840075995608,"score":220,"time":"2024-07-27T03:23:39Z","purchase":"2024-06-25T22:34:45Z"}
{"myid":909,"named":"s6sei5apbrMzUv4OSOzOH2ExFLCp2N7B","named_s":"kEIDAKm9","x_flag":true,"rate":16.56382703365203,"score":129,"time":"2024-07-07T03:50:18Z","purchase":"2024-05-08T23:01:53Z"}
{"myid":910,"named":"9Pc4x787a4sWpPGqzV6OQZVSqIvzO0rS","named_s":"KwQ2Eznr","x_flag":true,"rate":-73.35854935057456,"score":172,"time":"2024-07-21T13:35:40Z","purchase":"2024-07-23T19:32:20Z"}
{"myid":413,"named":"rKAsCV8eLJfMcZZttVLdoRGCIlxppXJb","named_s":"SaZiEkCc","x_flag":false,"rate":-73.23947830054233,"score":170,"time":"2024-06-01T11:10:07Z","purchase":"2024-05-17T22:16:38Z"}
{"myid":414,"named":"XDC42GogdTv6lrNCXgrclcqZROAUQHkU","named_s":"3MnMDUjQ","x_flag":false,"rate":76.38507681698047,"score":232,"time":"2024-05-16T19:21:39Z","purchase":"2024-06-20T08:24:36Z"}
...

一瞬でkafkaに登録できました。 randomjじゃなくてデータベースから取ってくれば簡単にpush通知ができそうな気がしてきました。もちろんkafkaじゃなくてredisのpub/subでもできます。 メリットデメリットでどちらを使うかを決めたら良いと思いますが、kafkaにpushする実装をembulkに任せられたら、楽になるかもしれませんね。

GWだったのでちょっとやってみたみたいなことをやってみました。