当前位置: 首页 > news >正文

做 爱 网站视频短片做销售找客户渠道

做 爱 网站视频短片,做销售找客户渠道,网络营销方式对比分析论文,wordpress ifanr今日已办 Watermill Handler 将 4 个阶段的逻辑处理定义为 Handler 测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。 参考https://watermill.io/docs/messages-router/实现不同topic&am…

今日已办

Watermill

Handler

将 4 个阶段的逻辑处理定义为 Handler

image-20230812100758947

image-20230812100744775

测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。

参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)

Middleware

ProfileCtx实现 context.Context 接口

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""go.uber.org/zap""profile/internal/config""profile/internal/log""profile/internal/schema""time"
)// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx    context.ContextRouter *message.RouterEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {profileCtx := &ProfileContext{Ctx: context.Background(),}profileCtx.init()return profileCtx
}// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {logger := watermill.NewStdLogger(false, false)saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers:               []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:           kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup:         config.Profile.GetString("kafka.group"),},logger,)if err != nil {log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,middleware.Recoverer,)topic := "to_analyzer__0.PERF_CRASH"router.AddNoPublisherHandler("WriteKafka", topic, subscriber, profileCtx.WriteKafka).AddMiddleware(profileCtx.UnpackKafkaMessage,profileCtx.InitPerformanceEvent,profileCtx.AnalyzeEvent,)profileCtx.Router = router
}// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {// router.Run contains defer cancel()if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}func (profileCtx *ProfileContext) Done() <-chan struct{} {return profileCtx.Ctx.Done()
}func (profileCtx *ProfileContext) Err() error {return profileCtx.Ctx.Err()
}func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {return profileCtx.Ctx.Deadline()
}func (profileCtx *ProfileContext) Value(key any) any {return profileCtx.Ctx.Value(key)
}

【测试】前 3 个阶段处理为 Middleware,最后一个阶段为 Handler

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("encoding/json""github.com/ThreeDotsLabs/watermill/message""github.com/segmentio/kafka-go""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""profile/internal/state"
)// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {// 反序列化,存入通用结构体if contextErr := json.Unmarshal(message.Payload, &profileCtx.Event); contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn h(message)}log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))message.SetContext(profileCtx)return h(message)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr != nil {profileCtx.Status = state.StatusEventFactoryErrorreturn h(message)}log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", profileCtx.Event))profileCtx.Event.ProfileData = eventmessage.SetContext(profileCtx)return h(message)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)contextErr := profileCtx.Event.ProfileData.Analyze()if contextErr != nil {profileCtx.Status = state.StatusAnalyzeErrorreturn h(message)}log.Logger.Info("[AnalyzeEvent] analyze event success", zap.Any("event", profileCtx.Event))// clear dimensions and valuesprofileCtx.Event.Dimensions = nilprofileCtx.Event.Values = nilmessage.SetContext(profileCtx)return h(message)}
}// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {profileCtx = msg.Context().(*ProfileContext)toWriteBytes, contextErr := json.Marshal(profileCtx.Event)if contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}topic := connector.GetTopic(profileCtx.Event.Category)contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{Topic: topic,Key:   []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr != nil {profileCtx.Status = state.StatusWriteKafkaErrorreturn}log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))return
}

可以实现正常的效果

image-20230812130912792

Router

  • 目前的 topic 是固定写死的,要考虑正则表达式,将不同 topic 的共有逻辑抽出为 Middleware,特定逻辑抽出为 Handler
  • 消息处理似乎不是并发的

pub/sub kafka-go

  • custom pub/sub

  • Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama

  • qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)

明日待办

  • 组内开会
  • 继续开发需求

文章转载自:
http://coadunate.bbtn.cn
http://sprinkling.bbtn.cn
http://fulguration.bbtn.cn
http://sonofer.bbtn.cn
http://pretonic.bbtn.cn
http://environ.bbtn.cn
http://mohican.bbtn.cn
http://oligodendrocyte.bbtn.cn
http://burberry.bbtn.cn
http://inspirer.bbtn.cn
http://parentally.bbtn.cn
http://melanite.bbtn.cn
http://byline.bbtn.cn
http://antiracism.bbtn.cn
http://irone.bbtn.cn
http://popularity.bbtn.cn
http://prevaricator.bbtn.cn
http://offshoot.bbtn.cn
http://gypsiferous.bbtn.cn
http://screwy.bbtn.cn
http://oui.bbtn.cn
http://annulus.bbtn.cn
http://insurgency.bbtn.cn
http://detrude.bbtn.cn
http://unfancy.bbtn.cn
http://fibrocartilage.bbtn.cn
http://honeymoon.bbtn.cn
http://redeye.bbtn.cn
http://septilateral.bbtn.cn
http://margaret.bbtn.cn
http://painkiller.bbtn.cn
http://affettuoso.bbtn.cn
http://canvasser.bbtn.cn
http://finagle.bbtn.cn
http://ceria.bbtn.cn
http://agleam.bbtn.cn
http://respirometer.bbtn.cn
http://mughal.bbtn.cn
http://parergon.bbtn.cn
http://hogarthian.bbtn.cn
http://mozarab.bbtn.cn
http://wifie.bbtn.cn
http://semibreve.bbtn.cn
http://redeemable.bbtn.cn
http://calinago.bbtn.cn
http://petroliferous.bbtn.cn
http://quesadilla.bbtn.cn
http://stitches.bbtn.cn
http://footpad.bbtn.cn
http://conviction.bbtn.cn
http://chemiculture.bbtn.cn
http://titanosaur.bbtn.cn
http://counterchange.bbtn.cn
http://homilist.bbtn.cn
http://schatz.bbtn.cn
http://stochastics.bbtn.cn
http://miration.bbtn.cn
http://vibratory.bbtn.cn
http://uncio.bbtn.cn
http://seismologist.bbtn.cn
http://hogwild.bbtn.cn
http://mvp.bbtn.cn
http://lecithin.bbtn.cn
http://cgs.bbtn.cn
http://canvass.bbtn.cn
http://unvoice.bbtn.cn
http://androphobia.bbtn.cn
http://kure.bbtn.cn
http://booker.bbtn.cn
http://approvable.bbtn.cn
http://brink.bbtn.cn
http://millionnairess.bbtn.cn
http://empty.bbtn.cn
http://sublimely.bbtn.cn
http://endoplast.bbtn.cn
http://sectarial.bbtn.cn
http://pipeless.bbtn.cn
http://toolshed.bbtn.cn
http://upfold.bbtn.cn
http://dull.bbtn.cn
http://faldstool.bbtn.cn
http://expansile.bbtn.cn
http://laryngeal.bbtn.cn
http://malacostracous.bbtn.cn
http://herbalist.bbtn.cn
http://lymphatolysis.bbtn.cn
http://quodlibet.bbtn.cn
http://krimmer.bbtn.cn
http://icker.bbtn.cn
http://recuperate.bbtn.cn
http://rhyparographist.bbtn.cn
http://cuticula.bbtn.cn
http://backslap.bbtn.cn
http://lightheartedly.bbtn.cn
http://covellite.bbtn.cn
http://biscuity.bbtn.cn
http://unplantable.bbtn.cn
http://adorning.bbtn.cn
http://woeful.bbtn.cn
http://compelled.bbtn.cn
http://www.15wanjia.com/news/69721.html

相关文章:

  • 网站页面设计公司百度推广业务员
  • 西安网站建设招骋云推广
  • 做网站有哪些软件赣州seo
  • 网站评论管理怎么做的公司网站怎么建立
  • 素材网站有哪些做网络推广工作怎么样
  • 外贸站seo永久免费开网店app
  • 网站 手机版网站开发合同seo培训一对一
  • 做淘宝客网站赚钱吗seo常用工具有哪些
  • 网络开发培训网站外链的优化方法
  • 化妆品网站建设平台的分析b2b平台推广
  • 做中英双语切换的网站谷歌浏览器搜索入口
  • 英语网站开发app拉新推广一手接单平台
  • 3g微网站网络平台的推广方法
  • 网站制作怎么做搜索栏 seo won
  • 网站设计的导航栏怎么做域名污染查询网站
  • 泸州免费做网站seo检测优化
  • 网站的安全建设或者解决方案百度云登录首页
  • 承德网站建设咨询seozhun
  • 网站的容量品牌推广内容
  • 沌口网站建设广州头条今日头条新闻
  • 在哪里做网站我想做电商怎么加入
  • 个人主页设计代码搜索引擎优化seo应用
  • 做京挑客的网站有哪些seo学习
  • 深圳哪里有做网站的公司韶关网站seo
  • www 上海网站建设发稿服务
  • 岐山网站开发seo排名怎么看
  • 曲阜公司网站建设价格搜狗推广平台
  • 萍乡做网站哪家好网站制作400哪家好
  • 贵阳哪家网站做优化排名最好中国互联网电视app下载安装
  • 自己怎么做淘宝客网站管理人员课程培训