如何使用Camel AWS S3 SourceConnector 将S3文件读取到Kafka主题。

作者 : IT 大叔 本文共9415个字,预计阅读时间需要24分钟 发布时间: 2020-09-20

Apache Camel

Apache Camel是面向消息中间件的开源框架,具有基于规则的路由和中介引擎,该引擎使用应用程序编程接口来配置路由和中介规则,从而提供基于Java对象的Enterprise Integration Patterns实现。

Red Hat AMQ Streams

红帽AMQ Streams是一个基于Apache ZooKeeper和Apache Kafka项目的可大规模扩展,分布式和高性能的数据流平台。

主要组成部分包括:

卡夫卡经纪人
消息代理,负责将记录从生产客户传递到消费客户。

Apache ZooKeeper是Kafka的核心依赖项,为高度可靠的分布式协调提供集群协调服务。

AMQ Streams架构

如何使用Camel AWS S3 SourceConnector 将S3文件读取到Kafka主题。插图

Camelkafka连接器

CamelKafka连接器允许您将所有骆驼组件用作Kafka Connect连接器。

这是一个“ Camel Kafka连接器适配器”,旨在提供一种用户友好的方式来使用Kafka Connect中的所有Apache Camel组件。 

Kafka Connect是用于在Apache Kafka和其他系统之间可扩展且可靠地流传输数据的工具。快速定义连接器可以轻松地将大量数据移入和移出Kafka 。有关Kafka Connect的更多信息,请在此处查看

先决条件

对于此演示,您将需要在开发环境中设置以下技术:

  1. Apache Maven 3.6.3以上
  2. 已安装JDK 11
  3. 卡夫卡集群 
  4. AWS帐户设置和文件在S3存储桶中可用。 

在本文中,我们演示了如何从S3存储桶中读取文件并使用以下命令写入kafka Topic 

CamelAWSS3SourceConnector

准备所需的位以开发示例

 在您的kafka中设置plugin.path属性

打开$ KAFKA_HOME / config / connect-standalone.properties

并将plugin.path属性设置为您选择的位置

plugin.path=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors/

在此示例中,我们将使用/ home / kkakarla / development / fuse-ocp-training-videos / kcs / amq-streams-kafka / camel-kafka-connectors

下载'camel-aws-s3-kafka-connector'

wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.4.0/camel-aws-s3-kafka-connector-0.4.0-package.zip
unzip camel-aws-s3-kafka-connector-0.4.0-package.zip

在文件CamelAWSS3SourceConnector.properties中配置属性

name=CamelAWSS3SourceConnector
connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
camel.source.maxPollDuration=10000
topics=mytopic
camel.component.aws-s3.access-key=xxxxx
camel.component.aws-s3.secret-key=yyyyyy
camel.component.aws-s3.region=US_EAST_2
camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector
camel.source.endpoint.autocloseBody=true

启动Zookeeper和Kafka服务器

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

创建主题

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic  mytopic

现在启动应用程序

./bin/connect-standalone.sh /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/connect-standalone.properties  /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connector-examples/examples/aws-s3/CamelAWSS3SourceConnector.properties

现在将任何文件上传到S3存储桶“ kkakarla-test-kafka-connector

2020-09-01 12:43:17,149] INFO Kafka version: 2.5.0.redhat-00003 (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-09-01 12:43:17,149] INFO Kafka commitId: f960e3745ec74111 (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-09-01 12:43:17,149] INFO Kafka startTimeMs: 1598944397149 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-09-01 12:43:17,156] INFO Starting CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask:77)
[2020-09-01 12:43:17,156] INFO [Producer clientId=connector-producer-CamelAWSS3SourceConnector-0] Cluster ID: jmyQzmm2QUe12p8is0zNAQ (org.apache.kafka.clients.Metadata:280)
[2020-09-01 12:43:17,156] INFO Created connector CamelAWSS3SourceConnector (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2020-09-01 12:43:17,157] INFO CamelAwss3SourceConnectorConfig values: 
    camel.component.aws-s3.accelerateModeEnabled = false
    camel.component.aws-s3.accessKey = null
    camel.component.aws-s3.amazonS3Client = null
    camel.component.aws-s3.autoCreateBucket = true
    camel.component.aws-s3.autocloseBody = true
    camel.component.aws-s3.basicPropertyBinding = false
    camel.component.aws-s3.bridgeErrorHandler = false
    camel.component.aws-s3.chunkedEncodingDisabled = false
    camel.component.aws-s3.configuration = null
    camel.component.aws-s3.deleteAfterRead = true
    camel.component.aws-s3.delimiter = null
    camel.component.aws-s3.dualstackEnabled = false
    camel.component.aws-s3.encryptionMaterials = null
    camel.component.aws-s3.endpointConfiguration = null
    camel.component.aws-s3.fileName = null
    camel.component.aws-s3.forceGlobalBucketAccessEnabled = false
    camel.component.aws-s3.includeBody = true
    camel.component.aws-s3.pathStyleAccess = false
    camel.component.aws-s3.payloadSigningEnabled = false
    camel.component.aws-s3.policy = null
    camel.component.aws-s3.prefix = null
    camel.component.aws-s3.proxyHost = null
    camel.component.aws-s3.proxyPort = null
    camel.component.aws-s3.proxyProtocol = HTTPS
    camel.component.aws-s3.region = US_EAST_2
    camel.component.aws-s3.secretKey = null
    camel.component.aws-s3.useEncryption = false
    camel.component.aws-s3.useIAMCredentials = false
    camel.source.camelMessageHeaderKey = null
    camel.source.component = aws-s3
    camel.source.contentLogLevel = OFF
    camel.source.endpoint.accelerateModeEnabled = false
    camel.source.endpoint.accessKey = null
    camel.source.endpoint.amazonS3Client = null
    camel.source.endpoint.autoCreateBucket = true
    camel.source.endpoint.autocloseBody = true
    camel.source.endpoint.backoffErrorThreshold = null
    camel.source.endpoint.backoffIdleThreshold = null
    camel.source.endpoint.backoffMultiplier = null
47
    camel.source.endpoint.basicPropertyBinding = false
48
    camel.source.endpoint.bridgeErrorHandler = false
49
    camel.source.endpoint.chunkedEncodingDisabled = false
50
    camel.source.endpoint.delay = 500
51
    camel.source.endpoint.deleteAfterRead = true
52
    camel.source.endpoint.delimiter = null
53
    camel.source.endpoint.dualstackEnabled = false
54
    camel.source.endpoint.encryptionMaterials = null
55
    camel.source.endpoint.endpointConfiguration = null
56
    camel.source.endpoint.exceptionHandler = null
57
    camel.source.endpoint.exchangePattern = null
58
    camel.source.endpoint.fileName = null
59
    camel.source.endpoint.forceGlobalBucketAccessEnabled = false
60
    camel.source.endpoint.greedy = false
61
    camel.source.endpoint.includeBody = true
62
    camel.source.endpoint.initialDelay = 1000
63
    camel.source.endpoint.maxConnections = 60
64
    camel.source.endpoint.maxMessagesPerPoll = 10
65
    camel.source.endpoint.pathStyleAccess = false
66
    camel.source.endpoint.payloadSigningEnabled = false
67
    camel.source.endpoint.policy = null
68
    camel.source.endpoint.pollStrategy = null
69
    camel.source.endpoint.prefix = null
70
    camel.source.endpoint.proxyHost = null
71
    camel.source.endpoint.proxyPort = null
72
    camel.source.endpoint.proxyProtocol = HTTPS
73
    camel.source.endpoint.region = null
74
    camel.source.endpoint.repeatCount = 0
75
    camel.source.endpoint.runLoggingLevel = TRACE
76
    camel.source.endpoint.scheduledExecutorService = null
77
    camel.source.endpoint.scheduler = none
78
    camel.source.endpoint.schedulerProperties = null
79
    camel.source.endpoint.secretKey = null
80
    camel.source.endpoint.sendEmptyMessageWhenIdle = false
81
    camel.source.endpoint.startScheduler = true
82
    camel.source.endpoint.synchronous = false
83
    camel.source.endpoint.timeUnit = MILLISECONDS
84
    camel.source.endpoint.useEncryption = false
85
    camel.source.endpoint.useFixedDelay = true
86
    camel.source.endpoint.useIAMCredentials = false
87
    camel.source.marshal = null
88
    camel.source.maxBatchPollSize = 1000
89
    camel.source.maxPollDuration = 10000
90
    camel.source.path.bucketNameOrArn = arn:aws:s3:::kkakarla-test-kafka-connector
91
    camel.source.pollingConsumerBlockTimeout = 0
92
    camel.source.pollingConsumerBlockWhenFull = true
93
    camel.source.pollingConsumerQueueSize = 1000
94
    camel.source.unmarshal = null
95
    camel.source.url = null
96
    topics = mytopic
97
 (org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnectorConfig:347)
98
[2020-09-01 12:43:17,239] INFO Setting initial properties in Camel context: [{connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector, camel.source.endpoint.autocloseBody=true, camel.source.maxPollDuration=10000, topics=mytopic, camel.component.aws-s3.region=US_EAST_2, camel.source.component=aws-s3, task.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceTask, camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector, camel.component.aws-s3.access-key=xxxxxxxx, name=CamelAWSS3SourceConnector, value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter, camel.component.aws-s3.secret-key=xxxxxxxxxxxx, key.converter=org.apache.kafka.connect.storage.StringConverter}] (org.apache.camel.kafkaconnector.utils.CamelMainSupport:91)
99
[2020-09-01 12:43:17,244] INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport:463)
100
[2020-09-01 12:43:17,271] INFO No additional Camel XML routes discovered from: classpath:camel/*.xml (org.apache.camel.main.DefaultRoutesCollector:126)
101
[2020-09-01 12:43:17,272] INFO No additional Camel XML rests discovered from: classpath:camel-rest/*.xml (org.apache.camel.main.DefaultRoutesCollector:162)
102
[2020-09-01 12:43:17,285] INFO Creating Camel route from({}) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:102)
103
[2020-09-01 12:43:17,285] INFO .to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:130)
104
[2020-09-01 12:43:17,299] INFO Starting CamelContext (org.apache.camel.kafkaconnector.utils.CamelMainSupport:138)
105
[2020-09-01 12:43:17,360] INFO Apache Camel 3.4.2 (camel-1) is starting (org.apache.camel.impl.engine.AbstractCamelContext:2630)
106
[2020-09-01 12:43:17,361] INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html (org.apache.camel.impl.engine.AbstractCamelContext:2773)
107
[2020-09-01 12:43:19,848] INFO Route: route1 started and consuming from: aws-s3://arn:aws:s3:::kkakarla-test-kafka-connector (org.apache.camel.impl.engine.InternalRouteStartupManager:158)

该文件的内容将被写入kafka主题mytopic

[kkakarla@kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mytopic
hi hello how are you

 

免责声明:
1. 本站资源转自互联网,源码资源分享仅供交流学习,下载后切勿用于商业用途,否则开发者追究责任与本站无关!
2. 本站使用「署名 4.0 国际」创作协议,可自由转载、引用,但需署名原版权作者且注明文章出处
3. 未登录无法下载,登录使用金币下载所有资源。
IT小站 » 如何使用Camel AWS S3 SourceConnector 将S3文件读取到Kafka主题。

常见问题FAQ

没有金币/金币不足 怎么办?
本站已开通每日签到送金币,每日签到赠送五枚金币,金币可累积。
所有资源普通会员都能下载吗?
本站所有资源普通会员都可以下载,需要消耗金币下载的白金会员资源,通过每日签到,即可获取免费金币,金币可累积使用。

发表评论