Code Duplication    Length = 8-14 lines in 5 locations

src/Pipeline/BusOperations.php 5 locations

@@ 59-66 (lines=8) @@
56
     * @param SendOptions          $options
57
     * @param PipelineStageContext $parentContext
58
     */
59
    public function send($message, SendOptions $options, PipelineStageContext $parentContext)
60
    {
61
        $this->ensureMessageId($options);
62
        $sendContext = $this->busOperationsContextFactory->createSendContext($message, $options, $parentContext);
63
        $pipeline = $this->pipelineFactory->createStartingWith(get_class($sendContext), $this->pipelineModifications);
64
65
        $pipeline->invoke($sendContext);
66
    }
67
68
    /**
69
     * @param object               $message
@@ 85-95 (lines=11) @@
82
     * @param PublishOptions       $options
83
     * @param PipelineStageContext $parentContext
84
     */
85
    public function publish($message, PublishOptions $options, PipelineStageContext $parentContext)
86
    {
87
        $this->ensureMessageId($options);
88
        $publishContext = $this->busOperationsContextFactory->createPublishContext($message, $options, $parentContext);
89
        $pipeline = $this->pipelineFactory->createStartingWith(
90
            get_class($publishContext),
91
            $this->pipelineModifications
92
        );
93
94
        $pipeline->invoke($publishContext);
95
    }
96
97
    /**
98
     * @param object          $message
@@ 102-112 (lines=11) @@
99
     * @param ReplyOptions    $options
100
     * @param IncomingContext $parentContext
101
     */
102
    public function reply($message, ReplyOptions $options, IncomingContext $parentContext)
103
    {
104
        $this->ensureMessageId($options);
105
        $publishContext = $this->busOperationsContextFactory->createReplyContext($message, $options, $parentContext);
106
        $pipeline = $this->pipelineFactory->createStartingWith(
107
            get_class($publishContext),
108
            $this->pipelineModifications
109
        );
110
111
        $pipeline->invoke($publishContext);
112
    }
113
114
    /**
115
     * @param string               $eventFqcn
@@ 119-132 (lines=14) @@
116
     * @param SubscribeOptions     $options
117
     * @param PipelineStageContext $parentContext
118
     */
119
    public function subscribe($eventFqcn, SubscribeOptions $options, PipelineStageContext $parentContext)
120
    {
121
        $subscribeContext = $this->busOperationsContextFactory->createSubscribeContext(
122
            $eventFqcn,
123
            $options,
124
            $parentContext
125
        );
126
        $pipeline = $this->pipelineFactory->createStartingWith(
127
            get_class($subscribeContext),
128
            $this->pipelineModifications
129
        );
130
131
        $pipeline->invoke($subscribeContext);
132
    }
133
134
    /**
135
     * @param string               $eventFqcn
@@ 139-152 (lines=14) @@
136
     * @param UnsubscribeOptions   $options
137
     * @param PipelineStageContext $parentContext
138
     */
139
    public function unsubscribe($eventFqcn, UnsubscribeOptions $options, PipelineStageContext $parentContext)
140
    {
141
        $unsubscribeContext = $this->busOperationsContextFactory->createUnsubscribeContext(
142
            $eventFqcn,
143
            $options,
144
            $parentContext
145
        );
146
        $pipeline = $this->pipelineFactory->createStartingWith(
147
            get_class($unsubscribeContext),
148
            $this->pipelineModifications
149
        );
150
151
        $pipeline->invoke($unsubscribeContext);
152
    }
153
154
    /**
155
     * @param OutgoingOptions $options