Code Duplication    Length = 162-162 lines in 2 locations

gvm/protocols/gmpv9/gmpv9.py 1 location

@@ 663-824 (lines=162) @@
660
661
        return self._send_xml_command(cmd)
662
663
    def get_aggregates(
664
        self,
665
        resource_type: EntityType,
666
        *,
667
        filter: Optional[str] = None,
668
        filter_id: Optional[str] = None,
669
        sort_criteria: Optional[list] = None,
670
        data_columns: Optional[list] = None,
671
        group_column: Optional[str] = None,
672
        subgroup_column: Optional[str] = None,
673
        text_columns: Optional[list] = None,
674
        first_group: Optional[int] = None,
675
        max_groups: Optional[int] = None,
676
        mode: Optional[int] = None,
677
        **kwargs,
678
    ) -> Any:
679
        """Request aggregated information on a resource / entity type
680
681
        Additional arguments can be set via the kwargs parameter for backward
682
        compatibility with older versions of python-gvm, but are not validated.
683
684
        Arguments:
685
            resource_type: The entity type to gather data from
686
            filter: Filter term to use for the query
687
            filter_id: UUID of an existing filter to use for the query
688
            sort_criteria: List of sort criteria (dicts that can contain
689
                a field, stat and order)
690
            data_columns: List of fields to aggregate data from
691
            group_column: The field to group the entities by
692
            subgroup_column: The field to further group the entities
693
                inside groups by
694
            text_columns: List of simple text columns which no statistics
695
                are calculated for
696
            first_group: The index of the first aggregate group to return
697
            max_groups: The maximum number of aggregate groups to return,
698
                -1 for all
699
            mode: Special mode for aggregation
700
701
        Returns:
702
            The response. See :py:meth:`send_command` for details.
703
        """
704
        if not resource_type:
705
            raise RequiredArgument(
706
                function=self.get_aggregates.__name__, argument='resource_type'
707
            )
708
709
        if not isinstance(resource_type, self.types.EntityType):
710
            raise InvalidArgumentType(
711
                function=self.get_aggregates.__name__,
712
                argument='resource_type',
713
                arg_type=self.types.EntityType.__name__,
714
            )
715
716
        cmd = XmlCommand('get_aggregates')
717
718
        _actual_resource_type = resource_type
719
        if resource_type.value == EntityType.AUDIT.value:
720
            _actual_resource_type = EntityType.TASK
721
            cmd.set_attribute('usage_type', 'audit')
722
        elif resource_type.value == EntityType.POLICY.value:
723
            _actual_resource_type = EntityType.SCAN_CONFIG
724
            cmd.set_attribute('usage_type', 'policy')
725
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
726
            cmd.set_attribute('usage_type', 'scan')
727
        elif resource_type.value == EntityType.TASK.value:
728
            cmd.set_attribute('usage_type', 'scan')
729
        cmd.set_attribute('type', _actual_resource_type.value)
730
731
        _add_filter(cmd, filter, filter_id)
732
733
        if first_group is not None:
734
            if not isinstance(first_group, int):
735
                raise InvalidArgumentType(
736
                    function=self.get_aggregates.__name__,
737
                    argument='first_group',
738
                    arg_type=int.__name__,
739
                )
740
            cmd.set_attribute('first_group', str(first_group))
741
742
        if max_groups is not None:
743
            if not isinstance(max_groups, int):
744
                raise InvalidArgumentType(
745
                    function=self.get_aggregates.__name__,
746
                    argument='max_groups',
747
                    arg_type=int.__name__,
748
                )
749
            cmd.set_attribute('max_groups', str(max_groups))
750
751
        if sort_criteria is not None:
752
            if not isinstance(sort_criteria, list):
753
                raise InvalidArgumentType(
754
                    function=self.get_aggregates.__name__,
755
                    argument='sort_criteria',
756
                    arg_type=list.__name__,
757
                )
758
            for sort in sort_criteria:
759
                if not isinstance(sort, dict):
760
                    raise InvalidArgumentType(
761
                        function=self.get_aggregates.__name__,
762
                        argument='sort_criteria',
763
                    )
764
765
                sort_elem = cmd.add_element('sort')
766
                if sort.get('field'):
767
                    sort_elem.set_attribute('field', sort.get('field'))
768
769
                if sort.get('stat'):
770
                    if isinstance(sort['stat'], AggregateStatistic):
771
                        sort_elem.set_attribute('stat', sort['stat'].value)
772
                    else:
773
                        stat = get_aggregate_statistic_from_string(sort['stat'])
774
                        sort_elem.set_attribute('stat', stat.value)
775
776
                if sort.get('order'):
777
                    if isinstance(sort['order'], SortOrder):
778
                        sort_elem.set_attribute('order', sort['order'].value)
779
                    else:
780
                        so = get_sort_order_from_string(sort['order'])
781
                        sort_elem.set_attribute('order', so.value)
782
783
        if data_columns is not None:
784
            if not isinstance(data_columns, list):
785
                raise InvalidArgumentType(
786
                    function=self.get_aggregates.__name__,
787
                    argument='data_columns',
788
                    arg_type=list.__name__,
789
                )
790
            for column in data_columns:
791
                cmd.add_element('data_column', column)
792
793
        if group_column is not None:
794
            cmd.set_attribute('group_column', group_column)
795
796
        if subgroup_column is not None:
797
            if not group_column:
798
                raise RequiredArgument(
799
                    '{} requires a group_column argument'
800
                    ' if subgroup_column is given'.format(
801
                        self.get_aggregates.__name__
802
                    ),
803
                    function=self.get_aggregates.__name__,
804
                    argument='subgroup_column',
805
                )
806
            cmd.set_attribute('subgroup_column', subgroup_column)
807
808
        if text_columns is not None:
809
            if not isinstance(text_columns, list):
810
                raise InvalidArgumentType(
811
                    function=self.get_aggregates.__name__,
812
                    argument='text_columns',
813
                    arg_type=list.__name__,
814
                )
815
            for column in text_columns:
816
                cmd.add_element('text_column', column)
817
818
        if mode is not None:
819
            cmd.set_attribute('mode', mode)
820
821
        # Add additional keyword args as attributes for backward compatibility.
822
        cmd.set_attributes(kwargs)
823
824
        return self._send_xml_command(cmd)
825
826
    def get_tls_certificates(
827
        self,

gvm/protocols/gmpv208/gmpv208.py 1 location

@@ 784-945 (lines=162) @@
781
782
        return self._send_xml_command(cmd)
783
784
    def get_aggregates(
785
        self,
786
        resource_type: EntityType,
787
        *,
788
        filter: Optional[str] = None,
789
        filter_id: Optional[str] = None,
790
        sort_criteria: Optional[list] = None,
791
        data_columns: Optional[list] = None,
792
        group_column: Optional[str] = None,
793
        subgroup_column: Optional[str] = None,
794
        text_columns: Optional[list] = None,
795
        first_group: Optional[int] = None,
796
        max_groups: Optional[int] = None,
797
        mode: Optional[int] = None,
798
        **kwargs,
799
    ) -> Any:
800
        """Request aggregated information on a resource / entity type
801
802
        Additional arguments can be set via the kwargs parameter for backward
803
        compatibility with older versions of python-gvm, but are not validated.
804
805
        Arguments:
806
            resource_type: The entity type to gather data from
807
            filter: Filter term to use for the query
808
            filter_id: UUID of an existing filter to use for the query
809
            sort_criteria: List of sort criteria (dicts that can contain
810
                a field, stat and order)
811
            data_columns: List of fields to aggregate data from
812
            group_column: The field to group the entities by
813
            subgroup_column: The field to further group the entities
814
                inside groups by
815
            text_columns: List of simple text columns which no statistics
816
                are calculated for
817
            first_group: The index of the first aggregate group to return
818
            max_groups: The maximum number of aggregate groups to return,
819
                -1 for all
820
            mode: Special mode for aggregation
821
822
        Returns:
823
            The response. See :py:meth:`send_command` for details.
824
        """
825
        if not resource_type:
826
            raise RequiredArgument(
827
                function=self.get_aggregates.__name__, argument='resource_type'
828
            )
829
830
        if not isinstance(resource_type, self.types.EntityType):
831
            raise InvalidArgumentType(
832
                function=self.get_aggregates.__name__,
833
                argument='resource_type',
834
                arg_type=self.types.EntityType.__name__,
835
            )
836
837
        cmd = XmlCommand('get_aggregates')
838
839
        _actual_resource_type = resource_type
840
        if resource_type.value == EntityType.AUDIT.value:
841
            _actual_resource_type = EntityType.TASK
842
            cmd.set_attribute('usage_type', 'audit')
843
        elif resource_type.value == EntityType.POLICY.value:
844
            _actual_resource_type = EntityType.SCAN_CONFIG
845
            cmd.set_attribute('usage_type', 'policy')
846
        elif resource_type.value == EntityType.SCAN_CONFIG.value:
847
            cmd.set_attribute('usage_type', 'scan')
848
        elif resource_type.value == EntityType.TASK.value:
849
            cmd.set_attribute('usage_type', 'scan')
850
        cmd.set_attribute('type', _actual_resource_type.value)
851
852
        _add_filter(cmd, filter, filter_id)
853
854
        if first_group is not None:
855
            if not isinstance(first_group, int):
856
                raise InvalidArgumentType(
857
                    function=self.get_aggregates.__name__,
858
                    argument='first_group',
859
                    arg_type=int.__name__,
860
                )
861
            cmd.set_attribute('first_group', str(first_group))
862
863
        if max_groups is not None:
864
            if not isinstance(max_groups, int):
865
                raise InvalidArgumentType(
866
                    function=self.get_aggregates.__name__,
867
                    argument='max_groups',
868
                    arg_type=int.__name__,
869
                )
870
            cmd.set_attribute('max_groups', str(max_groups))
871
872
        if sort_criteria is not None:
873
            if not isinstance(sort_criteria, list):
874
                raise InvalidArgumentType(
875
                    function=self.get_aggregates.__name__,
876
                    argument='sort_criteria',
877
                    arg_type=list.__name__,
878
                )
879
            for sort in sort_criteria:
880
                if not isinstance(sort, dict):
881
                    raise InvalidArgumentType(
882
                        function=self.get_aggregates.__name__,
883
                        argument='sort_criteria',
884
                    )
885
886
                sort_elem = cmd.add_element('sort')
887
                if sort.get('field'):
888
                    sort_elem.set_attribute('field', sort.get('field'))
889
890
                if sort.get('stat'):
891
                    if isinstance(sort['stat'], AggregateStatistic):
892
                        sort_elem.set_attribute('stat', sort['stat'].value)
893
                    else:
894
                        stat = get_aggregate_statistic_from_string(sort['stat'])
895
                        sort_elem.set_attribute('stat', stat.value)
896
897
                if sort.get('order'):
898
                    if isinstance(sort['order'], SortOrder):
899
                        sort_elem.set_attribute('order', sort['order'].value)
900
                    else:
901
                        so = get_sort_order_from_string(sort['order'])
902
                        sort_elem.set_attribute('order', so.value)
903
904
        if data_columns is not None:
905
            if not isinstance(data_columns, list):
906
                raise InvalidArgumentType(
907
                    function=self.get_aggregates.__name__,
908
                    argument='data_columns',
909
                    arg_type=list.__name__,
910
                )
911
            for column in data_columns:
912
                cmd.add_element('data_column', column)
913
914
        if group_column is not None:
915
            cmd.set_attribute('group_column', group_column)
916
917
        if subgroup_column is not None:
918
            if not group_column:
919
                raise RequiredArgument(
920
                    '{} requires a group_column argument'
921
                    ' if subgroup_column is given'.format(
922
                        self.get_aggregates.__name__
923
                    ),
924
                    function=self.get_aggregates.__name__,
925
                    argument='subgroup_column',
926
                )
927
            cmd.set_attribute('subgroup_column', subgroup_column)
928
929
        if text_columns is not None:
930
            if not isinstance(text_columns, list):
931
                raise InvalidArgumentType(
932
                    function=self.get_aggregates.__name__,
933
                    argument='text_columns',
934
                    arg_type=list.__name__,
935
                )
936
            for column in text_columns:
937
                cmd.add_element('text_column', column)
938
939
        if mode is not None:
940
            cmd.set_attribute('mode', mode)
941
942
        # Add additional keyword args as attributes for backward compatibility.
943
        cmd.set_attributes(kwargs)
944
945
        return self._send_xml_command(cmd)
946
947
    def get_tls_certificates(
948
        self,