pyflink 状态管理需要用到key_by

[root@master pyflink]# cat process_log.py
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction

import re
import redis

# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_test.txt')

# 对每行数据添加字符串 'aaaa'

class LogEvent:
    buss_seq = None
    message = None
    
    def __init__(self, bus_seq,message,index_name):
        self.bus_seq = bus_seq
        self.message = message
        self.index_name= index_name

    def to_dict(self):
        return {
            "bus_seq": self.bus_seq,
            "message": self.message,
            "index_name" : self.index_name
        }
class MyMapFunction(FlatMapFunction):
   def open(self, runtime_context: RuntimeContext):
       self.process_id_to_bus_seq = runtime_context.get_map_state(MapStateDescriptor('process_id', Types.STRING(), Types.STRING()))
   def close(self):
     self.r.close()

   def flat_map(self,line):
      if not line.startswith("ES"):
          return 
      if '<Serial>' in line:
         try:
             pat=re.compile(r"<Serial>(d+)</Serial>")
             bus_seq=pat.search(line).group(1)
             process_id=line.split()[1]
             self.process_id_to_bus_seq.put(process_id, bus_seq)
         except:
             return 
      process_id=line.split()[1]
      if not len(self.process_id)==6 :
          process_id=line.split()[2]
      try: 
          bus_seq=self.process_id_to_bus_seq.get(process_id).decode('UTF-8') 
      except:
          return 
      #self.r.delete(process_id)
      #log_event = LogEvent(bus_seq.decode('UTF-8'),line)
      #LogEvent['bus_seq']=bus_seq.decode('UTF-8')
      try:
         datetime.now().strftime("%Y-%m-%d")
         index_name='flink-test'+date_str
         log_event=LogEvent(bus_seq,line,index_name)
      except:
          return 
      yield log_event.to_dict()
    
env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")
date_str = datetime.now().strftime("%Y-%m-%d")
es7_sink = Elasticsearch7SinkBuilder()
    .set_bulk_flush_max_actions(1)
    .set_emitter(ElasticsearchEmitter.static_index('flink-test2023-06-07'))
    .set_hosts(['127.0.0.1:9200'])
    .build()
      

#new_stream = data_stream.map(MyMapFunction()).sink_to(es7_sink)
new_stream = data_stream.flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
#new_stream = data_stream.map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING()))
#new_stream = data_stream.map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING()))
# 输出到控制台
#new_stream.print()

# 执行任务
env.execute('Add "bus_seq" to each line')

[root@master pyflink]# python process_log.py
Traceback (most recent call last):
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
    response = task()
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 627, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 459, in get
    processor = bundle_processor.BundleProcessor(
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 871, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in create_execution_tree
    return collections.OrderedDict([(
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 927, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 814, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
    transform_consumers = {
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 907, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 907, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 814, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 911, in get_operation
    return transform_factory.create_operation(
  File "/usr/local/python38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1206, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/python38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 136, in create_data_stream_keyed_process_function
    return _create_user_defined_function_operation(
  File "/usr/local/python38/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 222, in _create_user_defined_function_operation
    return beam_operation_cls(
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 234, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "/usr/local/python38/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 92, in open
    self.open_func()
  File "/usr/local/python38/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 180, in open_func
    user_defined_func.open(runtime_context)
  File "/usr/local/python38/lib/python3.8/site-packages/pyflink/datastream/data_stream.py", line 345, in open
    self._open_func(runtime_context)
  File "process_log.py", line 51, in open
    self.process_id_to_bus_seq = runtime_context.get_map_state(MapStateDescriptor('process_id', Types.STRING(), Types.STRING()))
  File "/usr/local/python38/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py", line 134, in get_map_state
    raise Exception("This state is only accessible by functions executed on a KeyedStream.")
Exception: This state is only accessible by functions executed on a KeyedStream.

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇

)">
下一篇>>