Untitled

 avatar
unknown
java
2 years ago
190 kB
5
Indexable
/lib/jvm/java-11-openjdk-amd64/bin/java -javaagent:/snap/intellij-idea-ultimate/405/lib/idea_rt.jar=39119:/snap/intellij-idea-ultimate/405/bin -Dfile.encoding=UTF-8 -classpath /home/igor/Projects/flink-engine/target/classes:/home/igor/.m2/repository/org/springframework/boot/spring-boot-starter-actuator/2.2.6.RELEASE/spring-boot-starter-actuator-2.2.6.RELEASE.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-starter/2.2.6.RELEASE/spring-boot-starter-2.2.6.RELEASE.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot/2.2.6.RELEASE/spring-boot-2.2.6.RELEASE.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.2.6.RELEASE/spring-boot-autoconfigure-2.2.6.RELEASE.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.2.6.RELEASE/spring-boot-starter-logging-2.2.6.RELEASE.jar:/home/igor/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/home/igor/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/home/igor/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.12.1/log4j-to-slf4j-2.12.1.jar:/home/igor/.m2/repository/org/apache/logging/log4j/log4j-api/2.12.1/log4j-api-2.12.1.jar:/home/igor/.m2/repository/org/slf4j/jul-to-slf4j/1.7.30/jul-to-slf4j-1.7.30.jar:/home/igor/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/home/igor/.m2/repository/org/yaml/snakeyaml/1.25/snakeyaml-1.25.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-actuator-autoconfigure/2.2.6.RELEASE/spring-boot-actuator-autoconfigure-2.2.6.RELEASE.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-actuator/2.2.6.RELEASE/spring-boot-actuator-2.2.6.RELEASE.jar:/home/igor/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.10.3/jackson-databind-2.10.3.jar:/home/igor/.m2/repository/org/springframework/spring-context/5.2.5.RELEASE/spring-context-5.2.5.RELEASE.jar:/home/igor/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.10.3/jackson-datatype-jsr310-2.10.3.jar:/home/igor/.m2/repository/io/micrometer/micrometer-core/1.3.6/micrometer-core-1.3.6.jar:/home/igor/.m2/repository/org/hdrhistogram/HdrHistogram/2.1.11/HdrHistogram-2.1.11.jar:/home/igor/.m2/repository/org/latencyutils/LatencyUtils/2.0.3/LatencyUtils-2.0.3.jar:/home/igor/.m2/repository/org/objenesis/objenesis/2.6/objenesis-2.6.jar:/home/igor/.m2/repository/org/springframework/spring-core/5.2.5.RELEASE/spring-core-5.2.5.RELEASE.jar:/home/igor/.m2/repository/org/springframework/spring-jcl/5.2.5.RELEASE/spring-jcl-5.2.5.RELEASE.jar:/home/igor/.m2/repository/org/apache/flink/flink-java/1.11.2/flink-java-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-core/1.11.2/flink-core-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-annotations/1.11.2/flink-annotations-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-metrics-core/1.11.2/flink-metrics-core-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-shaded-asm-7/7.1-11.0/flink-shaded-asm-7-7.1-11.0.jar:/home/igor/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/home/igor/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/home/igor/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/igor/.m2/repository/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar:/home/igor/.m2/repository/org/apache/commons/commons-lang3/3.9/commons-lang3-3.9.jar:/home/igor/.m2/repository/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/home/igor/.m2/repository/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar:/home/igor/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/igor/.m2/repository/org/apache/flink/force-shading/1.11.2/force-shading-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-streaming-java_2.11/1.11.2/flink-streaming-java_2.11-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-runtime_2.11/1.11.2/flink-runtime_2.11-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-queryable-state-client-java/1.11.2/flink-queryable-state-client-java-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-hadoop-fs/1.11.2/flink-hadoop-fs-1.11.2.jar:/home/igor/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/home/igor/.m2/repository/org/apache/flink/flink-shaded-netty/4.1.39.Final-11.0/flink-shaded-netty-4.1.39.Final-11.0.jar:/home/igor/.m2/repository/org/apache/flink/flink-shaded-jackson/2.10.1-11.0/flink-shaded-jackson-2.10.1-11.0.jar:/home/igor/.m2/repository/org/apache/flink/flink-shaded-zookeeper-3/3.4.14-11.0/flink-shaded-zookeeper-3-3.4.14-11.0.jar:/home/igor/.m2/repository/org/javassist/javassist/3.24.0-GA/javassist-3.24.0-GA.jar:/home/igor/.m2/repository/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/home/igor/.m2/repository/com/typesafe/akka/akka-actor_2.11/2.5.21/akka-actor_2.11-2.5.21.jar:/home/igor/.m2/repository/com/typesafe/config/1.3.3/config-1.3.3.jar:/home/igor/.m2/repository/org/scala-lang/modules/scala-java8-compat_2.11/0.7.0/scala-java8-compat_2.11-0.7.0.jar:/home/igor/.m2/repository/com/typesafe/akka/akka-stream_2.11/2.5.21/akka-stream_2.11-2.5.21.jar:/home/igor/.m2/repository/org/reactivestreams/reactive-streams/1.0.3/reactive-streams-1.0.3.jar:/home/igor/.m2/repository/com/typesafe/ssl-config-core_2.11/0.3.7/ssl-config-core_2.11-0.3.7.jar:/home/igor/.m2/repository/org/scala-lang/modules/scala-parser-combinators_2.11/1.1.1/scala-parser-combinators_2.11-1.1.1.jar:/home/igor/.m2/repository/com/typesafe/akka/akka-protobuf_2.11/2.5.21/akka-protobuf_2.11-2.5.21.jar:/home/igor/.m2/repository/com/typesafe/akka/akka-slf4j_2.11/2.5.21/akka-slf4j_2.11-2.5.21.jar:/home/igor/.m2/repository/org/clapper/grizzled-slf4j_2.11/1.3.2/grizzled-slf4j_2.11-1.3.2.jar:/home/igor/.m2/repository/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar:/home/igor/.m2/repository/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar:/home/igor/.m2/repository/com/twitter/chill_2.11/0.7.6/chill_2.11-0.7.6.jar:/home/igor/.m2/repository/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/home/igor/.m2/repository/org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.jar:/home/igor/.m2/repository/org/apache/flink/flink-shaded-guava/18.0-11.0/flink-shaded-guava-18.0-11.0.jar:/home/igor/.m2/repository/org/apache/commons/commons-collections4/4.4/commons-collections4-4.4.jar:/home/igor/.m2/repository/org/apache/flink/flink-connector-rabbitmq_2.11/1.11.2/flink-connector-rabbitmq_2.11-1.11.2.jar:/home/igor/.m2/repository/com/rabbitmq/amqp-client/4.2.0/amqp-client-4.2.0.jar:/home/igor/.m2/repository/commons-codec/commons-codec/1.13/commons-codec-1.13.jar:/home/igor/.m2/repository/com/activitystream/as-transformation-common/1.4.2/as-transformation-common-1.4.2.jar:/home/igor/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.10.3/jackson-annotations-2.10.3.jar:/home/igor/.m2/repository/com/activitystream/event-catalog-avro/34/event-catalog-avro-34.jar:/home/igor/.m2/repository/org/apache/avro/avro/1.9.2/avro-1.9.2.jar:/home/igor/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.10.3/jackson-core-2.10.3.jar:/home/igor/.m2/repository/com/google/guava/guava/25.1-android/guava-25.1-android.jar:/home/igor/.m2/repository/org/checkerframework/checker-compat-qual/2.0.0/checker-compat-qual-2.0.0.jar:/home/igor/.m2/repository/com/google/errorprone/error_prone_annotations/2.1.3/error_prone_annotations-2.1.3.jar:/home/igor/.m2/repository/com/google/j2objc/j2objc-annotations/1.1/j2objc-annotations-1.1.jar:/home/igor/.m2/repository/org/codehaus/mojo/animal-sniffer-annotations/1.14/animal-sniffer-annotations-1.14.jar:/home/igor/.m2/repository/org/apache/flink/flink-avro/1.11.2/flink-avro-1.11.2.jar:/home/igor/.m2/repository/com/google/inject/guice/4.2.2/guice-4.2.2.jar:/home/igor/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/home/igor/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/home/igor/.m2/repository/org/apache/flink/flink-clients_2.11/1.11.2/flink-clients_2.11-1.11.2.jar:/home/igor/.m2/repository/org/apache/flink/flink-optimizer_2.11/1.11.2/flink-optimizer_2.11-1.11.2.jar:/home/igor/.m2/repository/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.jar:/home/igor/.m2/repository/org/projectlombok/lombok/1.18.12/lombok-1.18.12.jar:/home/igor/.m2/repository/com/google/code/gson/gson/2.8.6/gson-2.8.6.jar:/home/igor/.m2/repository/joda-time/joda-time/2.10.10/joda-time-2.10.10.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.2.6.RELEASE/spring-boot-starter-web-2.2.6.RELEASE.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.2.6.RELEASE/spring-boot-starter-json-2.2.6.RELEASE.jar:/home/igor/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.10.3/jackson-datatype-jdk8-2.10.3.jar:/home/igor/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.10.3/jackson-module-parameter-names-2.10.3.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.2.6.RELEASE/spring-boot-starter-tomcat-2.2.6.RELEASE.jar:/home/igor/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.33/tomcat-embed-core-9.0.33.jar:/home/igor/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.33/tomcat-embed-el-9.0.33.jar:/home/igor/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.33/tomcat-embed-websocket-9.0.33.jar:/home/igor/.m2/repository/org/springframework/boot/spring-boot-starter-validation/2.2.6.RELEASE/spring-boot-starter-validation-2.2.6.RELEASE.jar:/home/igor/.m2/repository/jakarta/validation/jakarta.validation-api/2.0.2/jakarta.validation-api-2.0.2.jar:/home/igor/.m2/repository/org/hibernate/validator/hibernate-validator/6.0.18.Final/hibernate-validator-6.0.18.Final.jar:/home/igor/.m2/repository/org/jboss/logging/jboss-logging/3.4.1.Final/jboss-logging-3.4.1.Final.jar:/home/igor/.m2/repository/com/fasterxml/classmate/1.5.1/classmate-1.5.1.jar:/home/igor/.m2/repository/org/springframework/spring-web/5.2.5.RELEASE/spring-web-5.2.5.RELEASE.jar:/home/igor/.m2/repository/org/springframework/spring-beans/5.2.5.RELEASE/spring-beans-5.2.5.RELEASE.jar:/home/igor/.m2/repository/org/springframework/spring-webmvc/5.2.5.RELEASE/spring-webmvc-5.2.5.RELEASE.jar:/home/igor/.m2/repository/org/springframework/spring-aop/5.2.5.RELEASE/spring-aop-5.2.5.RELEASE.jar:/home/igor/.m2/repository/org/springframework/spring-expression/5.2.5.RELEASE/spring-expression-5.2.5.RELEASE.jar:/home/igor/.m2/repository/org/eclipse/jetty/jetty-server/9.4.45.v20220203/jetty-server-9.4.45.v20220203.jar:/home/igor/.m2/repository/javax/servlet/javax.servlet-api/4.0.1/javax.servlet-api-4.0.1.jar:/home/igor/.m2/repository/org/eclipse/jetty/jetty-http/9.4.27.v20200227/jetty-http-9.4.27.v20200227.jar:/home/igor/.m2/repository/org/eclipse/jetty/jetty-util/9.4.27.v20200227/jetty-util-9.4.27.v20200227.jar:/home/igor/.m2/repository/org/eclipse/jetty/jetty-io/9.4.27.v20200227/jetty-io-9.4.27.v20200227.jar com.activitystream.flinkengine.flink.runners.EventTransformationRunner --env-name dev11 --rmq-uri v2-mq-dev.activitystream.com --rmq-in-queue test-igor-generic-events --rmq-out-exchange test-igor-transformed --parallelism 1 --rmq-max-unacknowledged 1000 --tenant-label test
13:47:39,301 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
13:47:39,303 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
13:47:39,303 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/home/igor/Projects/flink-engine/target/classes/logback.xml]
13:47:39,392 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set
13:47:39,392 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
13:47:39,399 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
13:47:39,405 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
13:47:39,425 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.FileAppender]
13:47:39,428 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [FILE]
13:47:39,429 |-ERROR in ch.qos.logback.core.joran.spi.Interpreter@10:20 - no applicable action for [withJansi], current ElementPath  is [[configuration][appender][withJansi]]
13:47:39,429 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
13:47:39,429 |-INFO in ch.qos.logback.core.FileAppender[FILE] - File property is set to [logs/tests.log]
13:47:39,431 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [com.activitystream.flinkengine.flink.connectors.rabbit] to DEBUG
13:47:39,431 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
13:47:39,431 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
13:47:39,431 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [FILE] to Logger[ROOT]
13:47:39,431 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
13:47:39,432 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@77ec78b9 - Registering current configuration as safe fallback point

[main] WARN  c.a.f.f.c.JobArguments - Starting EventTransformationJob with the following arguments:
envName='dev11'
rmqUri='v2-mq-dev.activitystream.com'
rmqMaxUnacknowledged=1000'
rmqInboundQueue='test-igor-generic-events'
rmqOutboundExchange='test-igor-transformed'
tenantLabel='test'
parallelism=1'
 
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.google.inject.internal.cglib.core.$ReflectUtils$1 (file:/home/igor/.m2/repository/com/google/inject/guice/4.2.2/guice-4.2.2.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of com.google.inject.internal.cglib.core.$ReflectUtils$1
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[main] INFO  o.a.f.a.j.t.TypeExtractor - class com.activitystream.transformation.common.model.event.TransactionG does not contain a getter for field occurredAt 
[main] INFO  o.a.f.a.j.t.TypeExtractor - Class class com.activitystream.transformation.common.model.event.TransactionG cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 
[main] INFO  o.a.f.a.j.t.TypeExtractor - class com.activitystream.flinkengine.model.TransformationEvent does not contain a setter for field inputEvent 
[main] INFO  o.a.f.a.j.t.TypeExtractor - Class class com.activitystream.flinkengine.model.TransformationEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 
[main] INFO  o.a.f.r.t.TaskExecutorResourceUtils - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 
[main] INFO  o.a.f.r.t.TaskExecutorResourceUtils - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 
[main] INFO  o.a.f.r.t.TaskExecutorResourceUtils - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 
[main] INFO  o.a.f.r.t.TaskExecutorResourceUtils - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 
[main] INFO  o.a.f.r.t.TaskExecutorResourceUtils - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 
[main] INFO  o.a.f.r.t.TaskExecutorResourceUtils - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 
[main] INFO  o.a.f.r.m.MiniCluster - Starting Flink Mini Cluster 
[main] INFO  o.a.f.r.m.MiniCluster - Starting Metrics Registry 
[main] INFO  o.a.f.r.m.MetricRegistryImpl - No metrics reporter configured, no metrics will be exposed/reported. 
[main] INFO  o.a.f.r.m.MiniCluster - Starting RPC Service(s) 
[main] INFO  o.a.f.r.r.a.AkkaRpcServiceUtils - Trying to start local actor system 
[flink-akka.actor.default-dispatcher-2] INFO  a.e.s.Slf4jLogger - Slf4jLogger started 
[main] INFO  o.a.f.r.r.a.AkkaRpcServiceUtils - Actor system started at akka://flink 
[main] INFO  o.a.f.r.r.a.AkkaRpcServiceUtils - Trying to start local actor system 
[flink-metrics-2] INFO  a.e.s.Slf4jLogger - Slf4jLogger started 
[main] INFO  o.a.f.r.r.a.AkkaRpcServiceUtils - Actor system started at akka://flink-metrics 
[main] INFO  o.a.f.r.r.a.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/rpc/MetricQueryService . 
[main] INFO  o.a.f.r.m.MiniCluster - Starting high-availability services 
[main] INFO  o.a.f.r.b.BlobServer - Created BLOB server storage directory /tmp/blobStore-05aada24-a1d6-4c0f-b57a-b5e109642908 
[main] INFO  o.a.f.r.b.BlobServer - Started BLOB server at 0.0.0.0:36197 - max concurrent requests: 50 - max backlog: 1000 
[main] INFO  o.a.f.r.b.PermanentBlobCache - Created BLOB cache storage directory /tmp/blobStore-5893b192-aa06-433a-9c9b-d5ca60e6e99d 
[main] INFO  o.a.f.r.b.TransientBlobCache - Created BLOB cache storage directory /tmp/blobStore-9d715062-a342-4e1b-8286-e3565d162d95 
[main] INFO  o.a.f.r.m.MiniCluster - Starting 1 TaskManger(s) 
[main] INFO  o.a.f.r.t.TaskManagerRunner - Starting TaskManager with ResourceID: 794ef396-5047-4c6d-ae4b-98be9d81d4b1 
[main] INFO  o.a.f.r.t.TaskManagerServices - Temporary file directory '/tmp': total 467 GB, usable 367 GB (78.59% usable) 
[main] INFO  o.a.f.r.i.d.FileChannelManagerImpl - FileChannelManager uses directory /tmp/flink-io-80309fd4-fb2a-4561-aeac-4f2ba1e86113 for spill files. 
[main] INFO  o.a.f.r.i.d.FileChannelManagerImpl - FileChannelManager uses directory /tmp/flink-netty-shuffle-358e0564-e4f4-467c-8041-4c3bdabcc6fd for spill files. 
[main] INFO  o.a.f.r.i.n.b.NetworkBufferPool - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768). 
[main] INFO  o.a.f.r.i.n.NettyShuffleEnvironment - Starting the network environment and its components. 
[main] INFO  o.a.f.r.t.KvStateService - Starting the kvState service and its components. 
[main] INFO  o.a.f.r.r.a.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/rpc/taskmanager_0 . 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.DefaultJobLeaderService - Start job leader service. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.f.FileCache - User file cache uses directory /tmp/flink-dist-cache-b07b9972-d896-4913-b82b-a171ed7245d2 
[main] INFO  o.a.f.r.d.DispatcherRestEndpoint - Starting rest endpoint. 
[main] INFO  o.a.f.r.d.DispatcherRestEndpoint - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath. 
[main] WARN  o.a.f.r.w.WebMonitorUtils - Log file environment variable 'log.file' is not set. 
[main] WARN  o.a.f.r.w.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'. 
[main] INFO  o.a.f.r.d.DispatcherRestEndpoint - Rest endpoint listening at localhost:45037 
[main] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Proposing leadership to contender http://localhost:45037 
[mini-cluster-io-thread-1] INFO  o.a.f.r.d.DispatcherRestEndpoint - http://localhost:45037 was granted leadership with leaderSessionID=8658aa54-b27e-4ed6-8783-3f34a7e104ae 
[mini-cluster-io-thread-1] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Received confirmation of leadership for leader http://localhost:45037 , session=8658aa54-b27e-4ed6-8783-3f34a7e104ae 
[main] INFO  o.a.f.r.r.a.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/rpc/resourcemanager_1 . 
[main] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Proposing leadership to contender LeaderContender: DefaultDispatcherRunner 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Proposing leadership to contender LeaderContender: StandaloneResourceManager 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.r.StandaloneResourceManager - ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted leadership with fencing token b61b31a2f0d20998eb24bceff405491b 
[main] INFO  o.a.f.r.m.MiniCluster - Flink Mini Cluster started successfully 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.r.s.SlotManagerImpl - Starting the SlotManager. 
[mini-cluster-io-thread-2] INFO  o.a.f.r.d.r.SessionDispatcherLeaderProcess - Start SessionDispatcherLeaderProcess. 
[mini-cluster-io-thread-5] INFO  o.a.f.r.d.r.SessionDispatcherLeaderProcess - Recover all persisted job graphs. 
[mini-cluster-io-thread-5] INFO  o.a.f.r.d.r.SessionDispatcherLeaderProcess - Successfully recovered 0 persisted job graphs. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/rpc/resourcemanager_1 , session=eb24bcef-f405-491b-b61b-31a2f0d20998 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.t.TaskExecutor - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(b61b31a2f0d20998eb24bceff405491b). 
[mini-cluster-io-thread-5] INFO  o.a.f.r.r.a.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_2 . 
[mini-cluster-io-thread-5] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/rpc/dispatcher_2 , session=ce0dff35-9322-4c77-a6be-be66f28d0a4f 
[flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.t.TaskExecutor - Resolved ResourceManager address, beginning registration 
[flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.r.StandaloneResourceManager - Registering TaskManager with ResourceID 794ef396-5047-4c6d-ae4b-98be9d81d4b1 (akka://flink/user/rpc/taskmanager_0) at ResourceManager 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.TaskExecutor - Successful registration at resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 5ebfd0e693b5acfe13e2aea8e1e30756. 
[flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.d.StandaloneDispatcher - Received JobGraph submission 5d38174275cad7bdc77108f56221d9db (event_transformation-test-dev11). 
[flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.d.StandaloneDispatcher - Submitting job 5d38174275cad7bdc77108f56221d9db (event_transformation-test-dev11). 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.r.a.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_3 . 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.JobMaster - Initializing job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db). 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.JobMaster - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=500) for event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db). 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.JobMaster - Running initialization on master for job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db). 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.JobMaster - Successfully ran initialization on master in 0 ms. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.s.a.DefaultExecutionTopology - Built 1 pipelined regions in 0 ms 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.JobMaster - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@c7341d9 for event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db). 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3 
[mini-cluster-io-thread-11] INFO  o.a.f.r.j.JobManagerRunnerImpl - JobManager runner for job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) was granted leadership with session id d2a8d8b1-9cd8-4b24-8673-bf46640c5e93 at akka://flink/user/rpc/jobmanager_3. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.j.JobMaster - Starting execution of job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) under job master id 8673bf46640c5e93d2a8d8b19cd84b24. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.j.JobMaster - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state CREATED to RUNNING. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) switched from CREATED to SCHEDULED. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) of job 5d38174275cad7bdc77108f56221d9db is not in state RUNNING but SCHEDULED instead. Aborting checkpoint. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.j.s.SlotPoolImpl - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{44bacbf72ccd16ada3f9926ee9de6e2c}] 
[jobmanager-future-thread-1] INFO  o.a.f.r.h.n.e.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_3 , session=d2a8d8b1-9cd8-4b24-8673-bf46640c5e93 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.j.JobMaster - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(b61b31a2f0d20998eb24bceff405491b) 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.j.JobMaster - Resolved ResourceManager address, beginning registration 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.r.StandaloneResourceManager - Registering job manager 8673bf46640c5e93d2a8d8b19cd84b24@akka://flink/user/rpc/jobmanager_3 for job 5d38174275cad7bdc77108f56221d9db. 
[flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.r.StandaloneResourceManager - Registered job manager 8673bf46640c5e93d2a8d8b19cd84b24@akka://flink/user/rpc/jobmanager_3 for job 5d38174275cad7bdc77108f56221d9db. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.JobMaster - JobManager successfully registered at ResourceManager, leader id: b61b31a2f0d20998eb24bceff405491b. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.j.s.SlotPoolImpl - Requesting new slot [SlotRequestId{44bacbf72ccd16ada3f9926ee9de6e2c}] and profile ResourceProfile{UNKNOWN} from resource manager. 
[flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.r.StandaloneResourceManager - Request slot with profile ResourceProfile{UNKNOWN} for job 5d38174275cad7bdc77108f56221d9db with allocation id 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.t.TaskExecutor - Receive slot request 04e3b04c392b16c2f0a2010854b4eb70 for job 5d38174275cad7bdc77108f56221d9db from resource manager with leader id b61b31a2f0d20998eb24bceff405491b. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.t.TaskExecutor - Allocated slot for 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.t.DefaultJobLeaderService - Add job 5d38174275cad7bdc77108f56221d9db for job leader monitoring. 
[mini-cluster-io-thread-14] INFO  o.a.f.r.t.DefaultJobLeaderService - Try to register at job manager akka://flink/user/rpc/jobmanager_3 with leader id d2a8d8b1-9cd8-4b24-8673-bf46640c5e93. 
[flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.t.DefaultJobLeaderService - Resolved JobManager address, beginning registration 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.DefaultJobLeaderService - Successful registration at job manager akka://flink/user/rpc/jobmanager_3 for job 5d38174275cad7bdc77108f56221d9db. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.TaskExecutor - Establish JobManager connection for job 5d38174275cad7bdc77108f56221d9db. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.TaskExecutor - Offer reserved slots to the leader of job 5d38174275cad7bdc77108f56221d9db. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #0) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) [DEPLOYING]. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1678193264344 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2). 
[flink-akka.actor.default-dispatcher-4] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) a3b8e518d348e7f5b1719b4b4a4ac1d2. 
[flink-akka.actor.default-dispatcher-18] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (a3b8e518d348e7f5b1719b4b4a4ac1d2) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50e4d1f8. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-18] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-18] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-18] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-13] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-13] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-13] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-13] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #1) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-13] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-13] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-18] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1678193323411 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff). 
[flink-akka.actor.default-dispatcher-24] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) fbe24af2d8465c972d51e3d2169ce3ff. 
[flink-akka.actor.default-dispatcher-27] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (fbe24af2d8465c972d51e3d2169ce3ff) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@74ea652b. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-27] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-27] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-27] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-29] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-29] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-29] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-29] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #2) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-29] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-29] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-29] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 3 (type=CHECKPOINT) @ 1678193384216 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7). 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) 73f69d1875ba27bce04a7783337675c7. 
[flink-akka.actor.default-dispatcher-40] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (73f69d1875ba27bce04a7783337675c7) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4c0e2bc. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-40] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-40] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-40] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #3) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-41] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 4 (type=CHECKPOINT) @ 1678193444450 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4). 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) b171c547d5d3798f1d04c4870818a0b4. 
[flink-akka.actor.default-dispatcher-54] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (b171c547d5d3798f1d04c4870818a0b4) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@232d9470. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-54] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-54] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-54] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #4) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-52] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 5 (type=CHECKPOINT) @ 1678193505904 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857). 
[flink-akka.actor.default-dispatcher-65] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) 283a6c70ce092cfc6c955611b2304857. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (283a6c70ce092cfc6c955611b2304857) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@64c69ef3. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #5) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-58] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 6 (type=CHECKPOINT) @ 1678193567583 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681). 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) 7e9750be10d2a929ba4dee9a921e5681. 
[flink-akka.actor.default-dispatcher-74] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (7e9750be10d2a929ba4dee9a921e5681) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@6c314ccd. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-74] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-74] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-74] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #6) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-75] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 7 (type=CHECKPOINT) @ 1678193627764 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44). 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) 865f159dfbe371826679d7d73505cb44. 
[flink-akka.actor.default-dispatcher-84] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (865f159dfbe371826679d7d73505cb44) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@7714b6a4. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-84] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-84] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-84] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #7) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-89] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 8 (type=CHECKPOINT) @ 1678193688167 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0). 
[flink-akka.actor.default-dispatcher-102] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) 21bc13bbac6ab5adc0ce32b91e7dccf0. 
[flink-akka.actor.default-dispatcher-99] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (21bc13bbac6ab5adc0ce32b91e7dccf0) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@88b6a76. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-99] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-99] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-99] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-103] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-103] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-103] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-103] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #8) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-103] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-103] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-103] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9 (type=CHECKPOINT) @ 1678193748515 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815). 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) 20842906e256e1963f8e4777a2e7b815. 
[flink-akka.actor.default-dispatcher-114] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (20842906e256e1963f8e4777a2e7b815) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@438439ce. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-114] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-114] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-114] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #9) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-112] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 10 (type=CHECKPOINT) @ 1678193808548 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86). 
[flink-akka.actor.default-dispatcher-123] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) 632488ab7b9d2d630863d58843692f86. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (632488ab7b9d2d630863d58843692f86) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3b0d2a86. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to RESTARTING. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RESTARTING to RUNNING. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) switched from CREATED to SCHEDULED. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) switched from SCHEDULED to DEPLOYING. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.ExecutionGraph - Deploying Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (attempt #10) to 794ef396-5047-4c6d-ae4b-98be9d81d4b1 @ localhost (dataPort=-1) 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Activate slot 04e3b04c392b16c2f0a2010854b4eb70. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.t.TaskExecutor - Received task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1). 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) switched from CREATED to DEPLOYING. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Loading JAR files for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Registering task at network: Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) [DEPLOYING]. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.s.r.t.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) switched from DEPLOYING to RUNNING. 
[flink-akka.actor.default-dispatcher-125] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) switched from DEPLOYING to RUNNING. 
[Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 11 (type=CHECKPOINT) @ 1678193870528 for job 5d38174275cad7bdc77108f56221d9db. 
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] ERROR o.a.f.s.r.t.StreamTask - Error during disposal of stream operator. 
java.lang.NullPointerException: null
	at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:180)
	at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:98)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:188)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] WARN  o.a.f.r.t.Task - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1)] INFO  o.a.f.r.t.Task - Freeing task resources for Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53). 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) d0218b1490852d809feb639e3c8a2c53. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Map -> (Filter -> Map -> Sink: Publish failed transaction, Filter -> Map -> Sink: Publish transformed Event) (1/1) (d0218b1490852d809feb639e3c8a2c53) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@5e4eedc0. 
java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.e.f.f.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task f0bd8a29f4afd2e5cc43b41a168f6ab5_0.  
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state RUNNING to FAILING. 
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=500)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
	at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.e.ExecutionGraph - Job event_transformation-test-dev11 (5d38174275cad7bdc77108f56221d9db) switched from state FAILING to FAILED. 
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=500)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
	at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 5d38174275cad7bdc77108f56221d9db. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.c.StandaloneCompletedCheckpointStore - Shutting down 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.m.MiniCluster - Shutting down Flink Mini Cluster 
[flink-akka.actor.default-dispatcher-136] INFO  o.a.f.r.d.StandaloneDispatcher - Job 5d38174275cad7bdc77108f56221d9db reached globally terminal state FAILED. 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.d.DispatcherRestEndpoint - Shutting down rest endpoint. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.TaskExecutor - Stopping TaskExecutor akka://flink/user/rpc/taskmanager_0. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.TaskExecutor - Close ResourceManager connection fa210f55ed2d4bfa2907efd7343bbcda. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.j.JobMaster - Stopping the JobMaster for job event_transformation-test-dev11(5d38174275cad7bdc77108f56221d9db). 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.TaskExecutor - Close JobManager connection for job 5d38174275cad7bdc77108f56221d9db. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.j.s.SlotPoolImpl - Suspending SlotPool. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.j.JobMaster - Close ResourceManager connection fa210f55ed2d4bfa2907efd7343bbcda: JobManager is shutting down.. 
[flink-akka.actor.default-dispatcher-136] INFO  o.a.f.r.r.StandaloneResourceManager - Closing TaskExecutor connection 794ef396-5047-4c6d-ae4b-98be9d81d4b1 because: The TaskExecutor is shutting down. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.j.s.SlotPoolImpl - Stopping SlotPool. 
[flink-akka.actor.default-dispatcher-136] INFO  o.a.f.r.r.StandaloneResourceManager - Disconnect job manager 8673bf46640c5e93d2a8d8b19cd84b24@akka://flink/user/rpc/jobmanager_3 for job 5d38174275cad7bdc77108f56221d9db from the resource manager. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.s.TaskSlotTableImpl - Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, allocationId: 04e3b04c392b16c2f0a2010854b4eb70, jobId: 5d38174275cad7bdc77108f56221d9db). 
[mini-cluster-io-thread-15] INFO  o.a.f.r.t.TaskExecutor - JobManager for job 5d38174275cad7bdc77108f56221d9db with leader id 8673bf46640c5e93d2a8d8b19cd84b24 lost leadership. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.DefaultJobLeaderService - Stop job leader service. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager. 
[main] ERROR c.a.f.f.j.EventTransformationJob -  
java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1717)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
	at com.activitystream.flinkengine.flink.jobs.EventTransformationJob.startJob(EventTransformationJob.java:73)
	at com.activitystream.flinkengine.flink.runners.EventTransformationRunner.main(EventTransformationRunner.java:20)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
	at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
	at akka.dispatch.OnComplete.internal(Future.scala:264)
	at akka.dispatch.OnComplete.internal(Future.scala:261)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=500)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
	at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	... 4 common frames omitted
Caused by: java.lang.RuntimeException: Error while creating the channel
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:171)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.setupConnection(RMQSink.java:153)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:161)
	... 11 common frames omitted
[ForkJoinPool.commonPool-worker-5] INFO  o.a.f.r.d.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ui 
[ForkJoinPool.commonPool-worker-5] INFO  o.a.f.r.d.DispatcherRestEndpoint - Shut down complete. 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.r.StandaloneResourceManager - Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed.. 
[ForkJoinPool.commonPool-worker-5] INFO  o.a.f.r.e.c.DispatcherResourceManagerComponent - Closing components. 
[ForkJoinPool.commonPool-worker-5] INFO  o.a.f.r.d.r.SessionDispatcherLeaderProcess - Stopping SessionDispatcherLeaderProcess. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.d.StandaloneDispatcher - Stopping dispatcher akka://flink/user/rpc/dispatcher_2. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.d.StandaloneDispatcher - Stopping all currently running jobs of dispatcher akka://flink/user/rpc/dispatcher_2. 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the SlotManager. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.r.h.l.b.BackPressureRequestCoordinator - Shutting down back pressure request coordinator. 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending the SlotManager. 
[flink-akka.actor.default-dispatcher-134] INFO  o.a.f.r.d.StandaloneDispatcher - Stopped dispatcher akka://flink/user/rpc/dispatcher_2. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.i.d.FileChannelManagerImpl - FileChannelManager removed spill file directory /tmp/flink-io-80309fd4-fb2a-4561-aeac-4f2ba1e86113 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.i.n.NettyShuffleEnvironment - Shutting down the network environment and its components. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.i.d.FileChannelManagerImpl - FileChannelManager removed spill file directory /tmp/flink-netty-shuffle-358e0564-e4f4-467c-8041-4c3bdabcc6fd 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.KvStateService - Shutting down the kvState service and its components. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.DefaultJobLeaderService - Stop job leader service. 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.f.FileCache - removed file cache directory /tmp/flink-dist-cache-b07b9972-d896-4913-b82b-a171ed7245d2 
[flink-akka.actor.default-dispatcher-137] INFO  o.a.f.r.t.TaskExecutor - Stopped TaskExecutor akka://flink/user/rpc/taskmanager_0. 
[AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1] INFO  o.a.f.r.r.a.AkkaRpcService - Stopping Akka RPC service. 
[flink-metrics-14] INFO  o.a.f.r.r.a.AkkaRpcService - Stopping Akka RPC service. 
[flink-metrics-14] INFO  o.a.f.r.r.a.AkkaRpcService - Stopped Akka RPC service. 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.b.PermanentBlobCache - Shutting down BLOB cache 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.b.TransientBlobCache - Shutting down BLOB cache 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.b.BlobServer - Stopped BLOB server at 0.0.0.0:36197 
[flink-akka.actor.default-dispatcher-133] INFO  o.a.f.r.r.a.AkkaRpcService - Stopped Akka RPC service. 

Process finished with exit code 0
Editor is loading...