package com.zero.ddd.akka.event.publisher2.actor.broker;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import com.google.common.cache.LoadingCache;
import com.zero.ddd.akka.cluster.core.initializer.config.BlockingIODispatcherSelector;
import com.zero.ddd.akka.cluster.core.initializer.serializer.SelfProtoBufObject;
import com.zero.ddd.akka.event.publisher2.actor.ServiceKeyHolder;
import com.zero.ddd.akka.event.publisher2.actor.consumer.EventSynchConsuemr.BatchPartitionEventCommand;
import com.zero.ddd.akka.event.publisher2.actor.consumer.EventSynchConsuemr.EventSynchConsuemrEvent;
import com.zero.ddd.akka.event.publisher2.actor.consumer.EventSynchConsuemr.PartitionEventCommand;
import com.zero.ddd.akka.event.publisher2.domain.partitionEvent.PartitionEvent;
import com.zero.ddd.akka.event.publisher2.domain.partitionEvent.PartitionEventStore;
import com.zero.ddd.akka.event.publisher2.domain.synchronizerState.PartitionAssignState;
import com.zero.ddd.akka.event.publisher2.domain.synchronizerState.SynchronizerState;
import com.zero.ddd.akka.event.publisher2.event.EventSynchronizer;
import com.zero.ddd.akka.event.publisher2.event.IRecordLastOffsetId;
import com.zero.ddd.akka.event.publisher2.publisher.EventPublisherFactory;
import com.zero.ddd.akka.event.publisher2.publisher.EventPublisherFactory.PartitionStoredEventWrapper;
import com.zero.ddd.akka.event.publisher2.publisher.EventPublisherFactory.StoredEventWrapper;
import com.zero.ddd.core.event.store.StoredEvent;
import com.zero.helper.GU;
import com.zero.helper.SimpleCacheBuilder;
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.PostStop;
import akka.actor.typed.PreRestart;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.Receptionist.Listing;
import akka.actor.typed.receptionist.ServiceKey;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RetryFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
*
* @say little Boy, don't be sad.
* @name Rezar
* @time 2023-06-12 03:58:36
* @Desc 些年若许,不负芳华.
*
*/
@Slf4j(topic = "event")
public class EventSynchronizerPublishBroker {
public static Behavior<EventSynchronizerBrokerEvent> create(
String eventSynchronizerId,
Materializer materizizer,
IRecordLastOffsetId iRecordLastOffsetId,
PartitionEventStore partitionEventStore,
EventPublisherFactory eventPublisherFactory) {
return Behaviors.setup(
context ->
new EventSynchronizerPublishBroker(
eventSynchronizerId,
materizizer,
iRecordLastOffsetId,
partitionEventStore,
eventPublisherFactory,
context)
.startEventPublish());
}
private final String eventSynchronizerId;
private final PartitionEventPublisher partitionEventPublisher;
private final ActorContext<EventSynchronizerBrokerEvent> context;
private final ServiceKey<EventSynchConsuemrEvent> eventConsumerServiceKey;
private final Map<String, ActorRef<EventSynchConsuemrEvent>> onlineWorker = new HashMap<>();
private SynchronizerState state;
private LoadingCache<Integer, ActorRef<EventSynchronizerBrokerEvent>> sinkActorRouteActorCache;
public EventSynchronizerPublishBroker(
String eventSynchronizerId,
Materializer materizizer,
IRecordLastOffsetId iRecordLastOffsetId,
PartitionEventStore partitionEventStore,
EventPublisherFactory eventPublisherFactory,
ActorContext<EventSynchronizerBrokerEvent> context) {
this.context = context;
this.eventSynchronizerId = eventSynchronizerId;
this.partitionEventPublisher =
new PartitionEventPublisher(
materizizer,
iRecordLastOffsetId,
partitionEventStore,
eventPublisherFactory);
this.state = new SynchronizerState();
this.eventConsumerServiceKey =
ServiceKeyHolder.eventConsumerServiceKey(
this.eventSynchronizerId);
this.initPartitionSinkActorCache();
this.registeAsSchedulerService();
this.subscribeEventConsumerService();
log.info(
"事件主题:[{}] 启动成功!!",
this.eventSynchronizerId);
}
private Behavior<EventSynchronizerBrokerEvent> startEventPublish() {
return Behaviors
.receive(EventSynchronizerBrokerEvent.class)
.onMessage(EventSynchronizerInfoReport.class, this::onEventSynchronizerInfoReport)
.onMessage(EventConsumerListingResponse.class, this::updateOnlineEventConsumerList)
.onMessage(Passivate.class, this::onPassivate)
.build();
}
private void initPartitionSinkActorCache() {
this.sinkActorRouteActorCache =
SimpleCacheBuilder.instance(
this::initPartitionSinkActor);
}
private ActorRef<EventSynchronizerBrokerEvent> initPartitionSinkActor(
int partitionId) {
return
context.spawn(
Behaviors.supervise(
Behaviors.receive(
EventSynchronizerBrokerEvent.class)
.onMessage(BrokerRoutePartitionEvent.class, this::onBrokerRoutePartitionEvent)
.onMessage(BrokerRouteBatchPartitionEvent.class, this::onBrokerRouteBatchPartitionEvent)
.onMessage(BrokerRouteConsumerAckEvent.class, this::onBrokerRouteConsumerAckEvent)
.onMessage(BrokerRoutePartitionSinkFail.class, this::onBrokerRoutePartitionSinkFail)
.onMessage(BrokerRouteConsumerNeedCompletePartitionSync.class, this::onBrokerRouteConsumerNeedCompletePartitionSync)
.onMessage(BrokerRouteConsumerNeedStartPartitionEventSync.class, this::onBrokerRouteConsumerNeedStartPartitionEventSync)
.onSignal(
PreRestart.class,
signal -> {
log.info(
"事件主题:[" + this.eventSynchronizerId + "] 分区:[" + partitionId + "] 事件背压流启动!!!");
return Behaviors.same();
})
.onSignal(
PostStop.class,
signal -> {
log.warn(
"事件主题:[" + this.eventSynchronizerId + "] 分区:[" + partitionId + "] 事件背压流停止!!!");
return Behaviors.same();
})
.build())
.onFailure(
Exception.class,
SupervisorStrategy.restart()),
// sink Actor名称
eventSynchronizerId + "-SinkActor-" + partitionId,
BlockingIODispatcherSelector.defaultDispatcher());
}
private ActorRef<EventSynchronizerBrokerEvent> partitionSinkActor(
int partition) {
return this.sinkActorRouteActorCache.getUnchecked(partition);
}
private Behavior<EventSynchronizerBrokerEvent> onPassivate(
Passivate passivate) {
this.partitionEventPublisher.shutdown();
return Behaviors.stopped();
}
private Behavior<EventSynchronizerBrokerEvent> onBrokerRouteConsumerNeedStartPartitionEventSync(
BrokerRouteConsumerNeedStartPartitionEventSync event) {
event.replyOk();
return Behaviors.same();
}
private Behavior<EventSynchronizerBrokerEvent> onBrokerRouteConsumerAckEvent(
BrokerRouteConsumerAckEvent event) {
this.partitionEventPublisher.storePartitionEventConsumedOffset(
event.getPartition(),
event.getConsumedOffset());
event.replyTo.tell(ACK.INSTANCE);
return Behaviors.same();
}
private Behavior<EventSynchronizerBrokerEvent> onBrokerRoutePartitionSin
没有合适的资源?快使用搜索试试~ 我知道了~
基于Java语言的springboot-ddd-framework设计模式通用组件框架源码
共320个文件
java:282个
xml:16个
factories:8个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 5 浏览量
2024-09-30
07:44:19
上传
评论
收藏 990KB ZIP 举报
温馨提示
该框架是一个基于Java语言的Spring Boot DDD设计模式通用组件框架,源码包含321个文件,主要涵盖282个Java类、16个XML配置文件,以及少量其他类型文件。该框架专注于实现DDD设计模式下的通用组件,旨在为软件开发提供高效、可扩展的解决方案。
资源推荐
资源详情
资源评论
收起资源包目录
基于Java语言的springboot-ddd-framework设计模式通用组件框架源码 (320个子文件)
baseCluster.conf 2KB
jdbc-persistence.conf 634B
spring.factories 163B
spring.factories 152B
spring.factories 147B
spring.factories 145B
spring.factories 142B
spring.factories 136B
spring.factories 132B
spring.factories 120B
.gitattributes 426B
.gitignore 465B
EventSynchronizerPublishBroker3.java 25KB
EventSynchronizerPublishBroker.java 24KB
GU.java 22KB
JobScheduler.java 21KB
EventSynchronizerPublishBroker.java 18KB
ServerLockActor.java 16KB
EventPublisherFactoryByDatabase.java 15KB
InvokerCreateor.java 15KB
ServerLockActor.java 15KB
RateLimiter.java 15KB
ServerLockActor.java 15KB
ServerLockActor2.java 14KB
JobInstanceState.java 13KB
JobScheduler.java 13KB
JobManager.java 11KB
JobWorker.java 11KB
ClientLockActor.java 11KB
ClientLockActor.java 11KB
EventSynchConsuemr.java 10KB
RateLimiterServerActor.java 9KB
BeanInvokeUtils.java 9KB
EventPublisherFactoryByMongodb.java 9KB
EventPublisherFactoryByMongodb.java 9KB
JobBeanProcessor.java 9KB
BeanMethodInvoke.java 8KB
EventSynchronizerBeanProcessor.java 8KB
JobReplicatedCache.java 8KB
InterProcessLock.java 8KB
JobScheduled.java 7KB
PartitionEventPublisher.java 7KB
ListToSingleValueJsonConverter.java 7KB
DistributedJobEndpointRegister.java 7KB
EventSynchronizerPublishBroker2.java 7KB
EventSynchronizerRegister.java 6KB
LevelDBUnitOfWork.java 6KB
JacksonUtil.java 6KB
EventPublisherFactoryByMongodb.java 6KB
ProtoBufSerializeUtils.java 6KB
AssertionConcern.java 6KB
SynchronizerState.java 6KB
ClassShortNameCache.java 6KB
YamlUtil.java 6KB
InterProcessLock.java 5KB
ServerBusinessLockState.java 5KB
AkkaClusterServerInitialization.java 5KB
AkkaClusterProperties.java 5KB
ConsistencyHashAlgorithm.java 5KB
DefaultOverriderAkkaClusterConfig.java 4KB
DemoUserDisTask.java 4KB
ThreadPoolExecutorForMonitor.java 4KB
ForkJoinPoolForMonitor.java 4KB
LevelDBProvider.java 4KB
InterProcessLockTest.java 4KB
SpecialNameRegistry.java 4KB
EventPublisherBrokerServiceWatcher.java 4KB
MultiSubTaskTypeDisTaskDemo.java 4KB
SmoothRateLimiter.java 4KB
AkkaClusterLockProvider.java 4KB
BucketRateLimiter.java 4KB
RecordLastOffsetIdByDatabase.java 3KB
BusinessException.java 3KB
LockGateActor.java 3KB
RelationalDatabasePublisherStarter.java 3KB
JobScheduledConfig.java 3KB
JobTaskAssignorTest.java 3KB
PartitionEventStoreByDatabase.java 3KB
LevelDBKey.java 3KB
VisitRateLimiter.java 3KB
PublishedNotificationTracker.java 3KB
JobSchedulerServiceWatcher.java 3KB
ConcurrentStopWatch.java 3KB
SimpleCacheBuilder.java 3KB
ExponentialBackoffRetry.java 3KB
AbstractBatchNotificationPublisher.java 3KB
JobEndpoint.java 3KB
AbstractSerializer.java 3KB
CancellBlockTest.java 3KB
ProtoBufSerializeUtilsTest.java 3KB
EventPublisherStarter.java 3KB
MultiArgsEventFunctionInvoker.java 3KB
LockHoldModel.java 3KB
AppNameShardAllocationStrategy.java 3KB
SmoothWarmingUp.java 3KB
TestUserListener.java 3KB
MonitorAbleForkJoinPoolConfigurator.java 3KB
TypeFilterAndShardingHashValGenerator.java 3KB
MonitorAbleExecutorServiceConfigurator.java 3KB
EventPublisherByMongoDBStarter.java 3KB
共 320 条
- 1
- 2
- 3
- 4
资源评论
csbysj2020
- 粉丝: 2640
- 资源: 5504
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功