如何使用Camel AWS S3 SourceConnector 将S3文件读取到Kafka主题。
Apache Camel
Apache Camel是面向消息中间件的开源框架,具有基于规则的路由和中介引擎,该引擎使用应用程序编程接口来配置路由和中介规则,从而提供基于Java对象的Enterprise Integration Patterns实现。
Red Hat AMQ Streams
红帽AMQ Streams是一个基于Apache ZooKeeper和Apache Kafka项目的可大规模扩展,分布式和高性能的数据流平台。
主要组成部分包括:
- 卡夫卡经纪人
- 消息代理,负责将记录从生产客户传递到消费客户。
Apache ZooKeeper是Kafka的核心依赖项,为高度可靠的分布式协调提供集群协调服务。
AMQ Streams架构
Camelkafka连接器
CamelKafka连接器允许您将所有骆驼组件用作Kafka Connect连接器。
这是一个“ Camel Kafka连接器适配器”,旨在提供一种用户友好的方式来使用Kafka Connect中的所有Apache Camel组件。
Kafka Connect是用于在Apache Kafka和其他系统之间可扩展且可靠地流传输数据的工具。快速定义连接器可以轻松地将大量数据移入和移出Kafka 。有关Kafka Connect的更多信息,请在此处查看。
先决条件
对于此演示,您将需要在开发环境中设置以下技术:
- Apache Maven 3.6.3以上
- 已安装JDK 11
- 卡夫卡集群
- AWS帐户设置和文件在S3存储桶中可用。
在本文中,我们演示了如何从S3存储桶中读取文件并使用以下命令写入kafka Topic
CamelAWSS3SourceConnector
准备所需的位以开发示例
在您的kafka中设置plugin.path属性
打开$ KAFKA_HOME / config / connect-standalone.properties
并将plugin.path属性设置为您选择的位置
plugin.path=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors/
在此示例中,我们将使用/ home / kkakarla / development / fuse-ocp-training-videos / kcs / amq-streams-kafka / camel-kafka-connectors
下载'camel-aws-s3-kafka-connector'
wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.4.0/camel-aws-s3-kafka-connector-0.4.0-package.zip unzip camel-aws-s3-kafka-connector-0.4.0-package.zip
在文件CamelAWSS3SourceConnector.properties中配置属性
name=CamelAWSS3SourceConnector connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter camel.source.maxPollDuration=10000 topics=mytopic camel.component.aws-s3.access-key=xxxxx camel.component.aws-s3.secret-key=yyyyyy camel.component.aws-s3.region=US_EAST_2 camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector camel.source.endpoint.autocloseBody=true
启动Zookeeper和Kafka服务器
./bin/zookeeper-server-start.sh config/zookeeper.properties ./bin/kafka-server-start.sh config/server.properties
创建主题
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
现在启动应用程序
./bin/connect-standalone.sh /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/connect-standalone.properties /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connector-examples/examples/aws-s3/CamelAWSS3SourceConnector.properties
现在将任何文件上传到S3存储桶“ kkakarla-test-kafka-connector
2020-09-01 12:43:17,149] INFO Kafka version: 2.5.0.redhat-00003 (org.apache.kafka.common.utils.AppInfoParser:117) [2020-09-01 12:43:17,149] INFO Kafka commitId: f960e3745ec74111 (org.apache.kafka.common.utils.AppInfoParser:118) [2020-09-01 12:43:17,149] INFO Kafka startTimeMs: 1598944397149 (org.apache.kafka.common.utils.AppInfoParser:119) [2020-09-01 12:43:17,156] INFO Starting CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask:77) [2020-09-01 12:43:17,156] INFO [Producer clientId=connector-producer-CamelAWSS3SourceConnector-0] Cluster ID: jmyQzmm2QUe12p8is0zNAQ (org.apache.kafka.clients.Metadata:280) [2020-09-01 12:43:17,156] INFO Created connector CamelAWSS3SourceConnector (org.apache.kafka.connect.cli.ConnectStandalone:112) [2020-09-01 12:43:17,157] INFO CamelAwss3SourceConnectorConfig values: camel.component.aws-s3.accelerateModeEnabled = false camel.component.aws-s3.accessKey = null camel.component.aws-s3.amazonS3Client = null camel.component.aws-s3.autoCreateBucket = true camel.component.aws-s3.autocloseBody = true camel.component.aws-s3.basicPropertyBinding = false camel.component.aws-s3.bridgeErrorHandler = false camel.component.aws-s3.chunkedEncodingDisabled = false camel.component.aws-s3.configuration = null camel.component.aws-s3.deleteAfterRead = true camel.component.aws-s3.delimiter = null camel.component.aws-s3.dualstackEnabled = false camel.component.aws-s3.encryptionMaterials = null camel.component.aws-s3.endpointConfiguration = null camel.component.aws-s3.fileName = null camel.component.aws-s3.forceGlobalBucketAccessEnabled = false camel.component.aws-s3.includeBody = true camel.component.aws-s3.pathStyleAccess = false camel.component.aws-s3.payloadSigningEnabled = false camel.component.aws-s3.policy = null camel.component.aws-s3.prefix = null camel.component.aws-s3.proxyHost = null camel.component.aws-s3.proxyPort = null camel.component.aws-s3.proxyProtocol = HTTPS camel.component.aws-s3.region = US_EAST_2 camel.component.aws-s3.secretKey = null camel.component.aws-s3.useEncryption = false camel.component.aws-s3.useIAMCredentials = false camel.source.camelMessageHeaderKey = null camel.source.component = aws-s3 camel.source.contentLogLevel = OFF camel.source.endpoint.accelerateModeEnabled = false camel.source.endpoint.accessKey = null camel.source.endpoint.amazonS3Client = null camel.source.endpoint.autoCreateBucket = true camel.source.endpoint.autocloseBody = true camel.source.endpoint.backoffErrorThreshold = null camel.source.endpoint.backoffIdleThreshold = null camel.source.endpoint.backoffMultiplier = null 47 camel.source.endpoint.basicPropertyBinding = false 48 camel.source.endpoint.bridgeErrorHandler = false 49 camel.source.endpoint.chunkedEncodingDisabled = false 50 camel.source.endpoint.delay = 500 51 camel.source.endpoint.deleteAfterRead = true 52 camel.source.endpoint.delimiter = null 53 camel.source.endpoint.dualstackEnabled = false 54 camel.source.endpoint.encryptionMaterials = null 55 camel.source.endpoint.endpointConfiguration = null 56 camel.source.endpoint.exceptionHandler = null 57 camel.source.endpoint.exchangePattern = null 58 camel.source.endpoint.fileName = null 59 camel.source.endpoint.forceGlobalBucketAccessEnabled = false 60 camel.source.endpoint.greedy = false 61 camel.source.endpoint.includeBody = true 62 camel.source.endpoint.initialDelay = 1000 63 camel.source.endpoint.maxConnections = 60 64 camel.source.endpoint.maxMessagesPerPoll = 10 65 camel.source.endpoint.pathStyleAccess = false 66 camel.source.endpoint.payloadSigningEnabled = false 67 camel.source.endpoint.policy = null 68 camel.source.endpoint.pollStrategy = null 69 camel.source.endpoint.prefix = null 70 camel.source.endpoint.proxyHost = null 71 camel.source.endpoint.proxyPort = null 72 camel.source.endpoint.proxyProtocol = HTTPS 73 camel.source.endpoint.region = null 74 camel.source.endpoint.repeatCount = 0 75 camel.source.endpoint.runLoggingLevel = TRACE 76 camel.source.endpoint.scheduledExecutorService = null 77 camel.source.endpoint.scheduler = none 78 camel.source.endpoint.schedulerProperties = null 79 camel.source.endpoint.secretKey = null 80 camel.source.endpoint.sendEmptyMessageWhenIdle = false 81 camel.source.endpoint.startScheduler = true 82 camel.source.endpoint.synchronous = false 83 camel.source.endpoint.timeUnit = MILLISECONDS 84 camel.source.endpoint.useEncryption = false 85 camel.source.endpoint.useFixedDelay = true 86 camel.source.endpoint.useIAMCredentials = false 87 camel.source.marshal = null 88 camel.source.maxBatchPollSize = 1000 89 camel.source.maxPollDuration = 10000 90 camel.source.path.bucketNameOrArn = arn:aws:s3:::kkakarla-test-kafka-connector 91 camel.source.pollingConsumerBlockTimeout = 0 92 camel.source.pollingConsumerBlockWhenFull = true 93 camel.source.pollingConsumerQueueSize = 1000 94 camel.source.unmarshal = null 95 camel.source.url = null 96 topics = mytopic 97 (org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnectorConfig:347) 98 [2020-09-01 12:43:17,239] INFO Setting initial properties in Camel context: [{connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector, camel.source.endpoint.autocloseBody=true, camel.source.maxPollDuration=10000, topics=mytopic, camel.component.aws-s3.region=US_EAST_2, camel.source.component=aws-s3, task.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceTask, camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector, camel.component.aws-s3.access-key=xxxxxxxx, name=CamelAWSS3SourceConnector, value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter, camel.component.aws-s3.secret-key=xxxxxxxxxxxx, key.converter=org.apache.kafka.connect.storage.StringConverter}] (org.apache.camel.kafkaconnector.utils.CamelMainSupport:91) 99 [2020-09-01 12:43:17,244] INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport:463) 100 [2020-09-01 12:43:17,271] INFO No additional Camel XML routes discovered from: classpath:camel/*.xml (org.apache.camel.main.DefaultRoutesCollector:126) 101 [2020-09-01 12:43:17,272] INFO No additional Camel XML rests discovered from: classpath:camel-rest/*.xml (org.apache.camel.main.DefaultRoutesCollector:162) 102 [2020-09-01 12:43:17,285] INFO Creating Camel route from({}) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:102) 103 [2020-09-01 12:43:17,285] INFO .to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:130) 104 [2020-09-01 12:43:17,299] INFO Starting CamelContext (org.apache.camel.kafkaconnector.utils.CamelMainSupport:138) 105 [2020-09-01 12:43:17,360] INFO Apache Camel 3.4.2 (camel-1) is starting (org.apache.camel.impl.engine.AbstractCamelContext:2630) 106 [2020-09-01 12:43:17,361] INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html (org.apache.camel.impl.engine.AbstractCamelContext:2773) 107 [2020-09-01 12:43:19,848] INFO Route: route1 started and consuming from: aws-s3://arn:aws:s3:::kkakarla-test-kafka-connector (org.apache.camel.impl.engine.InternalRouteStartupManager:158)
该文件的内容将被写入kafka主题mytopic
[kkakarla@kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mytopic hi hello how are you
1. 本站资源转自互联网,源码资源分享仅供交流学习,下载后切勿用于商业用途,否则开发者追究责任与本站无关!
2. 本站使用「署名 4.0 国际」创作协议,可自由转载、引用,但需署名原版权作者且注明文章出处
3. 未登录无法下载,登录使用金币下载所有资源。
IT小站 » 如何使用Camel AWS S3 SourceConnector 将S3文件读取到Kafka主题。
常见问题FAQ
- 没有金币/金币不足 怎么办?
- 本站已开通每日签到送金币,每日签到赠送五枚金币,金币可累积。
- 所有资源普通会员都能下载吗?
- 本站所有资源普通会员都可以下载,需要消耗金币下载的白金会员资源,通过每日签到,即可获取免费金币,金币可累积使用。