V1
Logstash 2.x版本output-kafka插件只支持kafka-0.8.x版本。但是工作中我们可能用到0.9.x版本的kafka。故而需要升级Logstash-output-kafka插件至3.x版本。
安装依赖包
yum -y install ruby rubygems ruby-devel
gem sources --add https://ruby.taobao.org/ --remove http://rubygems.org/
gem install jar-dependencies -v '0.3.4'
gem install ruby-maven -v '3.3.11'
升级output-kafka
/usr/local/logstash/bin/logstash-plugin update logstash-output-kafka
启动logstash 有如下警告信息
./logstash -f /usr/local/logstash/conf/kafka.conf
Settings: Default pipeline workers: 8
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Pipeline main started
解决办法
参考网站
1.切换到/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/目录下
cd /usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/
2.备份kafka.rb文件
mv kafka.rb{,.backup}
3.新建kafka.rb文件内容如下:
require 'logstash/namespace'
require 'logstash/outputs/base'
require 'jruby-kafka'
# Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
# the broker.
#
# The only required configuration is the topic name. The default codec is json,
# so events will be persisted on the broker in json format. If you select a codec of plain,
# Logstash will encode your messages with not only the message but also with a timestamp and
# hostname. If you do not want anything but your message passing through, you should make the output
# configuration something like:
# [source,ruby]
# output {
# kafka {
# codec => plain {
# format => "%{message}"
# }
# }
# }
# For more information see http://kafka.apache.org/documentation.html#theproducer
#
# Kafka producer configuration: http://kafka.apache.org/documentation.html#newproducerconfigs
class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config_name 'kafka'
default :codec, 'json'
# The topic to produce messages to
config :topic_id, :validate => :string, :required => true
# This is for bootstrapping and the producer will only use it for getting metadata (topics,
# partitions and replicas). The socket connections for sending the actual data will be
# established based on the broker information returned in the metadata. The format is
# `host1:port1,host2:port2`, and the list can be a subset of brokers or a VIP pointing to a
# subset of brokers.
config :bootstrap_servers, :validate => :string, :default => 'localhost:9092'
# Serializer class for the key of the message
config :key_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
# Serializer class for the value of the message
config :value_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
# The key that will be included with the record
#
# If a `message_key` is present, a partition will be chosen using a hash of the key.
# If not present, a partition for the message will be assigned in a round-robin fashion.
config :message_key, :validate => :string
# The number of acknowledgments the producer requires the leader to have received
# before considering a request complete.
#
# acks=0, the producer will not wait for any acknowledgment from the server at all.
# acks=1, This will mean the leader will write the record to its local log but
# will respond without awaiting full acknowledgement from all followers.
# acks=all, This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
config :acks, :validate => ["0", "1", "all"], :default => "1"
# The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
config :buffer_memory, :validate => :number, :default => 33554432
# The compression type for all data generated by the producer.
# The default is none (i.e. no compression). Valid values are none, gzip, or snappy.
config :compression_type, :validate => ["none", "gzip", "snappy"], :default => "none"
# Setting a value greater than zero will cause the client to
# resend any record whose send fails with a potentially transient error.
config :retries, :validate => :number, :default => 0
# The producer will attempt to batch records together into fewer requests whenever multiple
# records are being sent to the same partition. This helps performance on both the client
# and the server. This configuration controls the default batch size in bytes.
config :batch_size, :validate => :number, :default => 16384
# The id string to pass to the server when making requests.
# The purpose of this is to be able to track the source of requests beyond just
# ip/port by allowing a logical application name to be included with the request
config :client_id, :validate => :string
# The producer groups together any records that arrive in between request
# transmissions into a single batched request. Normally this occurs only under
# load when records arrive faster than they can be sent out. However in some circumstances
# the client may want to reduce the number of requests even under moderate load.
# This setting accomplishes this by adding a small amount of artificial delay—that is,
# rather than immediately sending out a record the producer will wait for up to the given delay
# to allow other records to be sent so that the sends can be batched together.
config :linger_ms, :validate => :number, :default => 0
# The maximum size of a request
config :max_request_size, :validate => :number, :default => 1048576
# The size of the TCP receive buffer to use when reading data
config :receive_buffer_bytes, :validate => :number, :default => 32768
# The size of the TCP send buffer to use when sending data.
config :send_buffer_bytes, :validate => :number, :default => 131072
# The configuration controls the maximum amount of time the server will wait for acknowledgments
# from followers to meet the acknowledgment requirements the producer has specified with the
# acks configuration. If the requested number of acknowledgments are not met when the timeout
# elapses an error will be returned. This timeout is measured on the server side and does not
# include the network latency of the request.
config :timeout_ms, :validate => :number, :default => 30000
# When our memory buffer is exhausted we must either stop accepting new
# records (block) or throw errors. By default this setting is true and we block,
# however in some scenarios blocking is not desirable and it is better to immediately give an error.
config :block_on_buffer_full, :validate => :boolean, :default => true
# the timeout setting for initial metadata request to fetch topic metadata.
config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000
# the max time in milliseconds before a metadata refresh is forced.
config :metadata_max_age_ms, :validate => :number, :default => 300000
# The amount of time to wait before attempting to reconnect to a given host when a connection fails.
config :reconnect_backoff_ms, :validate => :number, :default => 10
# The amount of time to wait before attempting to retry a failed produce request to a given topic partition.
config :retry_backoff_ms, :validate => :number, :default => 100
public
def register
LogStash::Logger.setup_log4j(@logger)
options = {
:key_serializer => @key_serializer,
:value_serializer => @value_serializer,
:bootstrap_servers => @bootstrap_servers,
:acks => @acks,
:buffer_memory => @buffer_memory,
:compression_type => @compression_type,
:retries => @retries,
:batch_size => @batch_size,
:client_id => @client_id,
:linger_ms => @linger_ms,
:max_request_size => @max_request_size,
:receive_buffer_bytes => @receive_buffer_bytes,
:send_buffer_bytes => @send_buffer_bytes,
:timeout_ms => @timeout_ms,
:block_on_buffer_full => @block_on_buffer_full,
:metadata_fetch_timeout_ms => @metadata_fetch_timeout_ms,
:metadata_max_age_ms => @metadata_max_age_ms,
:reconnect_backoff_ms => @reconnect_backoff_ms,
:retry_backoff_ms => @retry_backoff_ms
}
@producer = Kafka::KafkaProducer.new(options)
@producer.connect
@logger.info('Registering kafka producer', :topic_id => @topic_id, :bootstrap_servers => @bootstrap_servers)
@codec.on_event do |event, data|
begin
key = if @message_key.nil? then nil else event.sprintf(@message_key) end
@producer.send_msg(event.sprintf(@topic_id), nil, key, data)
rescue LogStash::ShutdownSignal
@logger.info('Kafka producer got shutdown signal')
rescue => e
@logger.warn('kafka producer threw exception, restarting',
:exception => e)
end
end
end # def register
def receive(event)
if event == LogStash::SHUTDOWN
return
end
@codec.encode(event)
end
def close
@producer.close
end
end #class LogStash::Outputs::Kafka
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。