开发者社区> 问答> 正文

Apache Flink:Python流API中的Kafka连接器,“无法加载用户类”

我正在尝试使用Flink的新Python流API并尝试运行我的脚本./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py。python脚本相当简单,我只是尝试使用现有主题并将所有内容发送到stdout(或默认情况下输出方法发出数据的日志目录中的* .out文件)。

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema

directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:

for jar in glob.glob(os.path.join(directory,'*.jar')):
            sys.path.append(jar)

from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09

props = Properties()
config = {"bootstrap_servers": "localhost:9092",

      "group_id": "flink_test",
      "topics": ["TopicCategory-TopicName"]}

props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")

def main(factory):

consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)

env = factory.get_execution_environment()
env.add_java_source(consumer) \
    .output()
env.execute()

我抓住的jar文件一把从Maven的回购协议,即flink-connector-kafka-0.9_2.11-1.6.1.jar,flink-connector-kafka-base_2.11-1.6.1.jar与kafka-clients-0.9.0.1.jar和弗林克的复制他们lib的目录。除非我误解了文档,否则这应该足以让Flink加载kafka连接器。实际上,如果我删除任何这些jar导入失败,但这似乎不足以实际调用该计划。添加for循环以动态添加这些sys.path也不起作用。这是在控制台中打印的内容:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
File "", line 1, in
File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)

org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.
这是我在日志中看到的:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:

file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)

Class not resolvable through given classloader.

at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

有没有办法解决这个问题并使连接器可用于Python?我怀疑这是Jython的类加载器问题,但我不知道如何进一步调查(也鉴于我不了解Java)。

展开
收起
flink小助手 2018-12-06 18:02:09 11326 0
3 条回答
写回答
取消 提交回答
  • 应该是集群环境的问题,你的代码里虽然把jar加到了classpath,但是只在driver上被执行了,job提交到别的node的taskmanager上时,会把对象反序列化出来,这个时候就找不到类了。我临时解决方案就是修改pyflink-stream.sh文件, 需要保证每个node上都有这些对应位置的jar存在。

    
    /opt/apps/flink/flink run --classpath file:///opt/apps/flink/kafka/flink-connector-kafka-0.10_2.11-1.8.0.jar --classpath file:///opt/apps/flink/kafka/flink-connector-kafka-0.9_2.11-1.8.0.jar --classpath file:///opt/apps/flink/kafka/flink-connector-kafka-base_2.11-1.8.0.jar --classpath file:///opt/apps/flink/kafka/snappy-java-1.1.2.6.jar --classpath file:///opt/apps/flink/kafka/force-shading-1.8.0.jar --classpath file:///opt/apps/flink/kafka/kafka-clients-0.10.2.1.jar --classpath file:///opt/apps/flink/kafka/lz4-1.3.0.jar --class org.apache.flink.streaming.python.api.PythonStreamBinder -v /opt/apps/flink/opt/flink-streaming-python*.jar "$@"
    

    有些变量被我写死了,可以恢复回去。

    2019-07-17 23:18:38
    赞同 展开评论 打赏
  • 您好,我遇到了类似的问题,请问您解决了吗?

    2019-07-17 23:18:38
    赞同 展开评论 打赏
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    你在这里使用错误的Kafka消费者。在你的代码中,它是FlinkKafkaConsumer09,但你正在使用的lib是flink-connector-kafka-0.11_2.11-1.6.1.jar,用于FlinkKafkaConsumer011。尝试替换FlinkKafkaConsumer09它FlinkKafkaConsumer011,或使用lib文件flink-connector-kafka-0.9_2.11-1.6.1.jar而不是当前文件。

    2019-07-17 23:18:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像