Logstash是一个开源的服务器端数据处理管道,可以同时从多个来源采集数据,转换数据,然后将数据发送到您指定的存储库中。

问题(response code: 429)

[2020-06-29T06:43:29,092][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 429 ({"type"=>"es_rejected_execution_exception", "reason"=>"rejected execution of processing of [27244][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[battle_button_click][4]] containing [34] requests, target allocation id: EEQebttSTiqKX9C0vXCPow, primary term: 1 on EsThreadPoolExecutor[name = ca3bffaea82ad29f996babdf68c41520/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@13cb1fbf[Running, pool size = 2, active threads = 2, queued tasks = 207, completed tasks = 1357]]"})

解决

1.缓解异常,修改elasticsearch.yml,放大write queue
thread_pool:
    write:
        size: 30
        queue_size: 1000

2.放大es的threadpool只能缓解,最终的解决都是用消息缓冲kafka等