# _*_ coding: utf-8 _*_
import time
from multiprocessing import Process, Queue
from jpype import *
import bigdata.mq.conf.settings_MQ as settings
from bigdata.mq.consumer.consumer_form_where import get_from_where
from bigdata.mq.consumer.consumer_config import ConsumerConfig
class PushConsumer:
"""
push方式消费RocketMQ
"""
def __init__(self, server, topic, group_id):
"""
初始化push consumer
:param consumer_config: ConsumerConfig对象
:param messageHandler: MessageHandlerInterface实现类
"""
self.config = ConsumerConfig(server, topic, group_id)
self.queue = Queue()
def _get_message(self):
"""
启动jvm,接受RocketMQ数据
:param consumer_config:
:param messageHandler:
:return:
"""
# 获取默认jvm路径,并启动jvm
jvmPath = getDefaultJVMPath()
startJVM(
jvmPath,
settings.JVM_HEAP_XMS,
settings.JVM_HEAP_XMX,
settings.JVM_HEAP_XMN,
settings.JAVA_EXT_DIRS
)
# 获取java com.clife.utils.mq.consumer.ConsumerConfig2对象,并初始化
ConsumerConfig2 = JPackage("com.clife.utils.mq.consumer").ConsumerConfig2
consumerConfig2 = ConsumerConfig2()
consumerConfig2.setNamesrvAddr(self.config.namesrv_addr)
consumerConfig2.setTopic(self.config.topic)
consumerConfig2.setGroupName(self.config.group_name)
consumerConfig2.setTags(self.config.tag)
consumerConfig2.setWhereFrom(get_from_where(self.config.from_where))
# 定义java接收到数据后的回调,将数据加载到Queue中
def handle(msg):
num = msg.size()
n = 0
while n < num:
message = msg.get(JInt(n)).getMessage()
n = n + 1
self.queue.put(message)
fun = {"handle": handle}
# 利用代理,定义java接口实现类
proxy = JProxy("com.clife.utils.mq.consumer.handler.ConsumerMsgHandler", dict=fun)
Consumer = JClass("com.clife.utils.mq.consumer.Consumer")
consumer = Consumer(consumerConfig2, proxy)
consumer.init()
# 阻塞当前进程
while True:
time.sleep(10)
def start(self):
self.process = Process(target=self._get_message)
self.process.start()
def close(self):
self.process.terminate()
def __iter__(self):
while True:
message = self.queue.get()
yield message
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
rocketmq-client.zip (42个子文件)
bigdata
conf
logback.xml 1KB
__init__.py 0B
mq
consumer
consumer_form_where.py 1KB
message_handler_interface.py 304B
message_handler.py 388B
push_consumer.py 3KB
__init__.py 23B
consumer_config.py 776B
conf
settings_MQ.py 452B
__init__.py 23B
producer
producer.py 2KB
producer_config.py 254B
__init__.py 23B
record.py 742B
__init__.py 23B
example
test.py 2KB
push_consumer_example.py 286B
__init__.py 23B
producer_example.py 348B
__init__.py 0B
lib
guava-12.0.1.jar 1.71MB
logback-core-1.2.3.jar 461KB
slf4j-api-1.7.6.jar 28KB
rocketmq-common-4.2.0.jar 260KB
netty-all-4.0.42.Final.jar 2.21MB
spring-expression-4.0.6.RELEASE.jar 201KB
spring-beans-4.0.6.RELEASE.jar 655KB
spring-core-4.0.6.RELEASE.jar 954KB
spring-context-support-4.0.6.RELEASE.jar 132KB
logback-classic-1.2.3.jar 284KB
clife-utils-mq-0.0.16-SNAPSHOT.jar 64KB
jcl-over-slf4j-1.7.6.jar 16KB
rocketmq-remoting-4.2.0.jar 112KB
rocketmq-client-4.2.0.jar 279KB
clife-data-empty-1.0-SNAPSHOT.jar 3KB
commons-lang3-3.4.jar 424KB
spring-aop-4.0.6.RELEASE.jar 345KB
jsr305-1.3.9.jar 32KB
fastjson-1.2.35.jar 470KB
log4j-over-slf4j-1.7.6.jar 23KB
aopalliance-1.0.jar 4KB
spring-context-4.0.6.RELEASE.jar 955KB
共 42 条
- 1
资源评论
凡尔Issac
- 粉丝: 3
- 资源: 2
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 5G网络基础培训课件.zip
- 2024-spring-HIT-CS-大作业
- yolo目标检测项目实验
- downloadFile-1.hc
- C++课程设计:基于Qt的航班信息管理系统
- ADS7822UVerilog驱动,前面传的有点问题
- 基于python的高性能爬虫程序,使用了多线程+缓存+xpath实现的,这里以彼-岸图库为例,实现,仅用于学习交流
- 中分辨率成像光谱仪(MODIS)烧毁面积产品信息MODIS-C6-BA-User-Guide-1.2.pdf
- Screenshot_20240427_172613_com.huawei.browser.jpg
- 关于学习Python的相关资源网站链接及相关介绍.docx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功