Code Duplication    Length = 56-56 lines in 2 locations

src/MessageMutation/Pipeline/Incoming/IncomingPhysicalMessageMutationPipelineStep.php 1 location

@@ 9-64 (lines=56) @@
6
use PSB\Core\Pipeline\Incoming\StageContext\IncomingPhysicalMessageContext;
7
use PSB\Core\Pipeline\PipelineStepInterface;
8
9
class IncomingPhysicalMessageMutationPipelineStep implements PipelineStepInterface
10
{
11
    /**
12
     * @var MessageMutatorRegistry
13
     */
14
    private $mutatorRegistry;
15
16
    /**
17
     * IncomingPhysicalMessageMutationPipelineStep constructor.
18
     *
19
     * @param MessageMutatorRegistry $mutatorRegistry
20
     */
21
    public function __construct(MessageMutatorRegistry $mutatorRegistry)
22
    {
23
        $this->mutatorRegistry = $mutatorRegistry;
24
    }
25
26
    /**
27
     * @param IncomingPhysicalMessageContext $context
28
     * @param callable                       $next
29
     */
30
    public function invoke($context, callable $next)
31
    {
32
        $mutatorIds = $this->mutatorRegistry->getIncomingPhysicalMessageMutatorIds();
33
34
        if (empty($mutatorIds)) {
35
            $next();
36
            return;
37
        }
38
39
        $physicalMessage = $context->getMessage();
40
        $mutatorContext = new IncomingPhysicalMessageMutationContext(
41
            $physicalMessage->getBody(),
42
            $physicalMessage->getHeaders()
43
        );
44
45
        foreach ($mutatorIds as $mutatorId) {
46
            /** @var IncomingPhysicalMessageMutatorInterface $mutator */
47
            $mutator = $context->getBuilder()->build($mutatorId);
48
            $mutator->mutateIncoming($mutatorContext);
49
        }
50
51
        $physicalMessage->replaceBody($mutatorContext->getBody());
52
        $physicalMessage->replaceHeaders($mutatorContext->getHeaders());
53
54
        $next();
55
    }
56
57
    /**
58
     * @return string
59
     */
60
    public static function getStageContextClass()
61
    {
62
        return IncomingPhysicalMessageContext::class;
63
    }
64
}
65

src/MessageMutation/Pipeline/Outgoing/OutgoingPhysicalMessageMutationPipelineStep.php 1 location

@@ 9-64 (lines=56) @@
6
use PSB\Core\Pipeline\Outgoing\StageContext\OutgoingPhysicalMessageContext;
7
use PSB\Core\Pipeline\PipelineStepInterface;
8
9
class OutgoingPhysicalMessageMutationPipelineStep implements PipelineStepInterface
10
{
11
    /**
12
     * @var MessageMutatorRegistry
13
     */
14
    private $mutatorRegistry;
15
16
    /**
17
     * IncomingPhysicalMessageMutationPipelineStep constructor.
18
     *
19
     * @param MessageMutatorRegistry $mutatorRegistry
20
     */
21
    public function __construct(MessageMutatorRegistry $mutatorRegistry)
22
    {
23
        $this->mutatorRegistry = $mutatorRegistry;
24
    }
25
26
    /**
27
     * @param OutgoingPhysicalMessageContext $context
28
     * @param callable                       $next
29
     */
30
    public function invoke($context, callable $next)
31
    {
32
        $mutatorIds = $this->mutatorRegistry->getOutgoingPhysicalMessageMutatorIds();
33
34
        if (empty($mutatorIds)) {
35
            $next();
36
            return;
37
        }
38
39
        $physicalMessage = $context->getMessage();
40
        $mutatorContext = new OutgoingPhysicalMessageMutationContext(
41
            $physicalMessage->getBody(),
42
            $physicalMessage->getHeaders()
43
        );
44
45
        foreach ($mutatorIds as $mutatorId) {
46
            /** @var OutgoingPhysicalMessageMutatorInterface $mutator */
47
            $mutator = $context->getBuilder()->build($mutatorId);
48
            $mutator->mutateOutgoing($mutatorContext);
49
        }
50
51
        $physicalMessage->replaceBody($mutatorContext->getBody());
52
        $physicalMessage->replaceHeaders($mutatorContext->getHeaders());
53
54
        $next();
55
    }
56
57
    /**
58
     * @return string
59
     */
60
    public static function getStageContextClass()
61
    {
62
        return OutgoingPhysicalMessageContext::class;
63
    }
64
}
65