Code Duplication    Length = 162-162 lines in 2 locations

gvm/protocols/gmpv208/gmpv208.py 1 location

@@ 790-951 (lines=162) @@
787
788
        return self._send_xml_command(cmd)
789
790
    def get_aggregates(
791
        self,
792
        resource_type: EntityType,
793
        *,
794
        filter: Optional[str] = None,
795
        filter_id: Optional[str] = None,
796
        sort_criteria: Optional[list] = None,
797
        data_columns: Optional[list] = None,
798
        group_column: Optional[str] = None,
799
        subgroup_column: Optional[str] = None,
800
        text_columns: Optional[list] = None,
801
        first_group: Optional[int] = None,
802
        max_groups: Optional[int] = None,
803
        mode: Optional[int] = None,
804
        **kwargs,
805
    ) -> Any:
806
        """Request aggregated information on a resource / entity type
807
808
        Additional arguments can be set via the kwargs parameter for backward
809
        compatibility with older versions of python-gvm, but are not validated.
810
811
        Arguments:
812
            resource_type: The entity type to gather data from
813
            filter: Filter term to use for the query
814
            filter_id: UUID of an existing filter to use for the query
815
            sort_criteria: List of sort criteria (dicts that can contain
816
                a field, stat and order)
817
            data_columns: List of fields to aggregate data from
818
            group_column: The field to group the entities by
819
            subgroup_column: The field to further group the entities
820
                inside groups by
821
            text_columns: List of simple text columns which no statistics
822
                are calculated for
823
            first_group: The index of the first aggregate group to return
824
            max_groups: The maximum number of aggregate groups to return,
825
                -1 for all
826
            mode: Special mode for aggregation
827
828
        Returns:
829
            The response. See :py:meth:`send_command` for details.
830
        """
831
        if not resource_type:
832
            raise RequiredArgument(
833
                function=self.get_aggregates.__name__, argument='resource_type'
834
            )
835
836
        if not isinstance(resource_type, self.types.EntityType):
837
            raise InvalidArgumentType(
838
                function=self.get_aggregates.__name__,
839
                argument='resource_type',
840
                arg_type=self.types.EntityType.__name__,
841
            )
842
843
        cmd = XmlCommand('get_aggregates')
844
845
        _actual_resource_type = resource_type
846
        if resource_type.value == EntityType.AUDIT.value:
847
            _actual_resource_type = EntityType.TASK
848
            cmd.set_attribute('usage_type', 'audit')
849
        elif resource_type.value == EntityType.POLICY.value:
850
            _actual_resource_type = EntityType.SCAN_CONFIG
851
            cmd.set_attribute('usage_type', 'policy')
852
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
853
            cmd.set_attribute('usage_type', 'scan')
854
        elif resource_type.value == EntityType.TASK.value:
855
            cmd.set_attribute('usage_type', 'scan')
856
        cmd.set_attribute('type', _actual_resource_type.value)
857
858
        _add_filter(cmd, filter, filter_id)
859
860
        if first_group is not None:
861
            if not isinstance(first_group, int):
862
                raise InvalidArgumentType(
863
                    function=self.get_aggregates.__name__,
864
                    argument='first_group',
865
                    arg_type=int.__name__,
866
                )
867
            cmd.set_attribute('first_group', str(first_group))
868
869
        if max_groups is not None:
870
            if not isinstance(max_groups, int):
871
                raise InvalidArgumentType(
872
                    function=self.get_aggregates.__name__,
873
                    argument='max_groups',
874
                    arg_type=int.__name__,
875
                )
876
            cmd.set_attribute('max_groups', str(max_groups))
877
878
        if sort_criteria is not None:
879
            if not isinstance(sort_criteria, list):
880
                raise InvalidArgumentType(
881
                    function=self.get_aggregates.__name__,
882
                    argument='sort_criteria',
883
                    arg_type=list.__name__,
884
                )
885
            for sort in sort_criteria:
886
                if not isinstance(sort, dict):
887
                    raise InvalidArgumentType(
888
                        function=self.get_aggregates.__name__,
889
                        argument='sort_criteria',
890
                    )
891
892
                sort_elem = cmd.add_element('sort')
893
                if sort.get('field'):
894
                    sort_elem.set_attribute('field', sort.get('field'))
895
896
                if sort.get('stat'):
897
                    if isinstance(sort['stat'], AggregateStatistic):
898
                        sort_elem.set_attribute('stat', sort['stat'].value)
899
                    else:
900
                        stat = get_aggregate_statistic_from_string(sort['stat'])
901
                        sort_elem.set_attribute('stat', stat.value)
902
903
                if sort.get('order'):
904
                    if isinstance(sort['order'], SortOrder):
905
                        sort_elem.set_attribute('order', sort['order'].value)
906
                    else:
907
                        so = get_sort_order_from_string(sort['order'])
908
                        sort_elem.set_attribute('order', so.value)
909
910
        if data_columns is not None:
911
            if not isinstance(data_columns, list):
912
                raise InvalidArgumentType(
913
                    function=self.get_aggregates.__name__,
914
                    argument='data_columns',
915
                    arg_type=list.__name__,
916
                )
917
            for column in data_columns:
918
                cmd.add_element('data_column', column)
919
920
        if group_column is not None:
921
            cmd.set_attribute('group_column', group_column)
922
923
        if subgroup_column is not None:
924
            if not group_column:
925
                raise RequiredArgument(
926
                    '{} requires a group_column argument'
927
                    ' if subgroup_column is given'.format(
928
                        self.get_aggregates.__name__
929
                    ),
930
                    function=self.get_aggregates.__name__,
931
                    argument='subgroup_column',
932
                )
933
            cmd.set_attribute('subgroup_column', subgroup_column)
934
935
        if text_columns is not None:
936
            if not isinstance(text_columns, list):
937
                raise InvalidArgumentType(
938
                    function=self.get_aggregates.__name__,
939
                    argument='text_columns',
940
                    arg_type=list.__name__,
941
                )
942
            for column in text_columns:
943
                cmd.add_element('text_column', column)
944
945
        if mode is not None:
946
            cmd.set_attribute('mode', mode)
947
948
        # Add additional keyword args as attributes for backward compatibility.
949
        cmd.set_attributes(kwargs)
950
951
        return self._send_xml_command(cmd)
952
953
    def get_tls_certificates(
954
        self,

gvm/protocols/gmpv9/gmpv9.py 1 location

@@ 663-824 (lines=162) @@
660
661
        return self._send_xml_command(cmd)
662
663
    def get_aggregates(
664
        self,
665
        resource_type: EntityType,
666
        *,
667
        filter: Optional[str] = None,
668
        filter_id: Optional[str] = None,
669
        sort_criteria: Optional[list] = None,
670
        data_columns: Optional[list] = None,
671
        group_column: Optional[str] = None,
672
        subgroup_column: Optional[str] = None,
673
        text_columns: Optional[list] = None,
674
        first_group: Optional[int] = None,
675
        max_groups: Optional[int] = None,
676
        mode: Optional[int] = None,
677
        **kwargs,
678
    ) -> Any:
679
        """Request aggregated information on a resource / entity type
680
681
        Additional arguments can be set via the kwargs parameter for backward
682
        compatibility with older versions of python-gvm, but are not validated.
683
684
        Arguments:
685
            resource_type: The entity type to gather data from
686
            filter: Filter term to use for the query
687
            filter_id: UUID of an existing filter to use for the query
688
            sort_criteria: List of sort criteria (dicts that can contain
689
                a field, stat and order)
690
            data_columns: List of fields to aggregate data from
691
            group_column: The field to group the entities by
692
            subgroup_column: The field to further group the entities
693
                inside groups by
694
            text_columns: List of simple text columns which no statistics
695
                are calculated for
696
            first_group: The index of the first aggregate group to return
697
            max_groups: The maximum number of aggregate groups to return,
698
                -1 for all
699
            mode: Special mode for aggregation
700
701
        Returns:
702
            The response. See :py:meth:`send_command` for details.
703
        """
704
        if not resource_type:
705
            raise RequiredArgument(
706
                function=self.get_aggregates.__name__, argument='resource_type'
707
            )
708
709
        if not isinstance(resource_type, self.types.EntityType):
710
            raise InvalidArgumentType(
711
                function=self.get_aggregates.__name__,
712
                argument='resource_type',
713
                arg_type=self.types.EntityType.__name__,
714
            )
715
716
        cmd = XmlCommand('get_aggregates')
717
718
        _actual_resource_type = resource_type
719
        if resource_type.value == EntityType.AUDIT.value:
720
            _actual_resource_type = EntityType.TASK
721
            cmd.set_attribute('usage_type', 'audit')
722
        elif resource_type.value == EntityType.POLICY.value:
723
            _actual_resource_type = EntityType.SCAN_CONFIG
724
            cmd.set_attribute('usage_type', 'policy')
725
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
726
            cmd.set_attribute('usage_type', 'scan')
727
        elif resource_type.value == EntityType.TASK.value:
728
            cmd.set_attribute('usage_type', 'scan')
729
        cmd.set_attribute('type', _actual_resource_type.value)
730
731
        _add_filter(cmd, filter, filter_id)
732
733
        if first_group is not None:
734
            if not isinstance(first_group, int):
735
                raise InvalidArgumentType(
736
                    function=self.get_aggregates.__name__,
737
                    argument='first_group',
738
                    arg_type=int.__name__,
739
                )
740
            cmd.set_attribute('first_group', str(first_group))
741
742
        if max_groups is not None:
743
            if not isinstance(max_groups, int):
744
                raise InvalidArgumentType(
745
                    function=self.get_aggregates.__name__,
746
                    argument='max_groups',
747
                    arg_type=int.__name__,
748
                )
749
            cmd.set_attribute('max_groups', str(max_groups))
750
751
        if sort_criteria is not None:
752
            if not isinstance(sort_criteria, list):
753
                raise InvalidArgumentType(
754
                    function=self.get_aggregates.__name__,
755
                    argument='sort_criteria',
756
                    arg_type=list.__name__,
757
                )
758
            for sort in sort_criteria:
759
                if not isinstance(sort, dict):
760
                    raise InvalidArgumentType(
761
                        function=self.get_aggregates.__name__,
762
                        argument='sort_criteria',
763
                    )
764
765
                sort_elem = cmd.add_element('sort')
766
                if sort.get('field'):
767
                    sort_elem.set_attribute('field', sort.get('field'))
768
769
                if sort.get('stat'):
770
                    if isinstance(sort['stat'], AggregateStatistic):
771
                        sort_elem.set_attribute('stat', sort['stat'].value)
772
                    else:
773
                        stat = get_aggregate_statistic_from_string(sort['stat'])
774
                        sort_elem.set_attribute('stat', stat.value)
775
776
                if sort.get('order'):
777
                    if isinstance(sort['order'], SortOrder):
778
                        sort_elem.set_attribute('order', sort['order'].value)
779
                    else:
780
                        so = get_sort_order_from_string(sort['order'])
781
                        sort_elem.set_attribute('order', so.value)
782
783
        if data_columns is not None:
784
            if not isinstance(data_columns, list):
785
                raise InvalidArgumentType(
786
                    function=self.get_aggregates.__name__,
787
                    argument='data_columns',
788
                    arg_type=list.__name__,
789
                )
790
            for column in data_columns:
791
                cmd.add_element('data_column', column)
792
793
        if group_column is not None:
794
            cmd.set_attribute('group_column', group_column)
795
796
        if subgroup_column is not None:
797
            if not group_column:
798
                raise RequiredArgument(
799
                    '{} requires a group_column argument'
800
                    ' if subgroup_column is given'.format(
801
                        self.get_aggregates.__name__
802
                    ),
803
                    function=self.get_aggregates.__name__,
804
                    argument='subgroup_column',
805
                )
806
            cmd.set_attribute('subgroup_column', subgroup_column)
807
808
        if text_columns is not None:
809
            if not isinstance(text_columns, list):
810
                raise InvalidArgumentType(
811
                    function=self.get_aggregates.__name__,
812
                    argument='text_columns',
813
                    arg_type=list.__name__,
814
                )
815
            for column in text_columns:
816
                cmd.add_element('text_column', column)
817
818
        if mode is not None:
819
            cmd.set_attribute('mode', mode)
820
821
        # Add additional keyword args as attributes for backward compatibility.
822
        cmd.set_attributes(kwargs)
823
824
        return self._send_xml_command(cmd)
825
826
    def get_tls_certificates(
827
        self,