<acronym id="fq3wk"></acronym>
        1. <track id="fq3wk"></track>
          您的位置:首頁 > 要聞 >

          觀熱點:大數據NiFi(二十一):監控日志文件生產到Kafka

          2023-03-09 10:11:02 來源:騰訊云

          ?監控日志文件生產到Kafka

          案例:監控某個目錄下的文件內容,將消息生產到Kafka中。

          此案例使用到“TailFile”和“PublishKafka_1_0”處理器。


          (資料圖片)

          一、???????配置“TailFile”處理器

          創建“TailFile”處理器并配置:

          注意:以上需要在NiFi集群中的每個節點上創建“/root/test/logdata”文件,“logdata”是文件,而非目錄。

          二、配置“PublishKafka_1_0”處理器

          “PublishKafka_1_0”處理器作用是使用Kafka 1.0生產者API將FlowFile的內容作為消息發送給Apache Kafka。發送的內容可以是單獨的FlowFile,也可以通過用戶指定分隔符分割的FlowFile內容。

          關于“PublishKafka_1_0”處理器的“Properties”主要配置的說明如下:

          配置項

          默認值

          允許值

          描述

          Kafka Brokers(Kafka節點)

          localhost:9092

          逗號分割的Kafka集群Broker列表。格式:host:port

          Topic Name(topic 名稱)

          將消息生產到的Topic 名稱。

          Delivery Guarantee(數據傳遞保證)

          0

          指定保證消息被發送到Kafka的要求。對應Kafka的"acks"屬性??梢耘渲玫捻椚缦拢築est Effort (盡力交付,相當于ack=0):在向Kafka節點寫出消息后,FlowFile將被路由到成功,而不需要等待響應。這提供了最好的性能,但可能會導致數據丟失。例如:消息寫出到Kafka節點,但是對應節點掛掉,這時將消息路由到成功。Guarantee Single Node Delivery(保證單節點交付,相當于ack=1,Kafka中的默認配置):KafkaProducer把消息發送出去,至少要等待leader已經成功將數據寫入本地log,但是并沒有等待所有follower是否成功寫入。該情況下,如果follower沒有成功備份數據,而此時leader剛好又掛掉了,就會導致消息丟失。該選項就是如果消息被單個Kafka節點接收到,FlowFile將被路由到成功,無論它是否被復制,但如果Kafka節點崩潰,可能會導致數據丟失。Guarantee Replicated Delivery(保證復制交付,相當于ack=-1):FlowFile數據寫出后,Kafka topic ISR列表離跟leader保持同步的那些follower都要把消息同步過去,該消息才會被認為成功,否則路由到失敗。

          Use Transactions(使用事務)

          true

          ?true?false

          指定NiFi是否應該在與Kafka通信時提供事務性保證。如果發送數據到Kafka有問題,并且這個屬性設置為false,那么已經發送到Kafka的消息將繼續發送,并被傳遞給消費者。如果這個設置為true,那么Kafka事務將被回滾,這樣這些消息對消費者是不可用的。將此設置為true需要將屬性設置為"Guarantee Replicated Delivery"。

          Best Effort (盡力交付,相當于ack=0):

          在向Kafka節點寫出消息后,FlowFile將被路由到成功,而不需要等待響應。這提供了最好的性能,但可能會導致數據丟失。例如:消息寫出到Kafka節點,但是對應節點掛掉,這時將消息路由到成功。

          Guarantee Single Node Delivery(保證單節點交付,相當于ack=1,Kafka中的默認配置):

          KafkaProducer把消息發送出去,至少要等待leader已經成功將數據寫入本地log,但是并沒有等待所有follower是否成功寫入。該情況下,如果follower沒有成功備份數據,而此時leader剛好又掛掉了,就會導致消息丟失。該選項就是如果消息被單個Kafka節點接收到,FlowFile將被路由到成功,無論它是否被復制,但如果Kafka節點崩潰,可能會導致數據丟失。 Guarantee Replicated Delivery(保證復制交付,相當于ack=-1): FlowFile數據寫出后,Kafka topic ISR列表離跟leader保持同步的那些follower都要把消息同步過去,該消息才會被認為成功,否則路由到失敗。 Use Transactions(使用事務)true true false 指定NiFi是否應該在與Kafka通信時提供事務性保證。如果發送數據到Kafka有問題,并且這個屬性設置為false,那么已經發送到Kafka的消息將繼續發送,并被傳遞給消費者。如果這個設置為true,那么Kafka事務將被回滾,這樣這些消息對消費者是不可用的。將此設置為true需要將屬性設置為"Guarantee Replicated Delivery"。

          “PublishKafka_1_0”處理器配置如下:

          1、創建“PublishKafka_1_0”處理器

          2、配置“PROPERTIES”

          注意:以上topic 可以在Kafka中創建好,也可以執行時自動創建。

          3、連接“TailFile”處理器和“PublishKafka_1_0”處理器

          連接“TailFile”處理器和“PublishKafka_1_0”處理器,并設置“PublishKafka_1_0”處理器“failure”和“success”路由關系為自動終止。

          三、運行測試

          1、啟動Kafka集群,啟動NiFi處理流程

          2、向/root/test/logdata文件中寫入數據并保存

          向NiFi集群中的其中一臺節點的“logdata”中寫入以下數據即可

          [root@node1 test]# echo "hello world1" > /root/test/logdata[root@node1 test]# echo "hello world2" >> /root/test/logdata[root@node1 test]# echo "hello world3" >> /root/test/logdata

          3、查看Kafka中自動創建的“nifi_topic”中的數據

          以上數據每寫入一行,有個空行,這是由于“TailFile”處理器監控數據導致的,實際就是寫入了3條數據,可以通過后期業務處理時,對數據進行trim處理即可。

          關鍵詞:

          [責任編輯:xwzkw]

          相關閱讀

          欧美黄色