Skip to content

API Documentation

API Endpoints

Enrichment

Handle enrichment requests for a specific observable (domain or IP address).

Parameters:

Name Type Description Default
request

The incoming request object containing query parameters.

required

Returns:

Name Type Description
Response

A JSON response indicating whether the observable was found,

and if so, the corresponding IOC.

Source code in docs/Submodules/GreedyBear/api/views/enrichment.py
@api_view([GET])
@authentication_classes([CookieTokenAuthentication])
@permission_classes([IsAuthenticated])
def enrichment_view(request):
    """
    Handle enrichment requests for a specific observable (domain or IP address).

    Args:
        request: The incoming request object containing query parameters.

    Returns:
        Response: A JSON response indicating whether the observable was found,
        and if so, the corresponding IOC.
    """
    observable_name = request.query_params.get("query")
    logger.info(f"Enrichment view requested for: {str(observable_name)}")
    serializer = EnrichmentSerializer(data=request.query_params, context={"request": request})
    serializer.is_valid(raise_exception=True)

    source_ip = str(request.META["REMOTE_ADDR"])
    request_source = Statistics(source=source_ip, view=viewType.ENRICHMENT_VIEW.value)
    request_source.save()

    return Response(serializer.data, status=status.HTTP_200_OK)

Feeds

Handle requests for IOC feeds with specific parameters and format the response accordingly.

Parameters:

Name Type Description Default
request

The incoming request object.

required
feed_type str

Type of feed (e.g., log4j, cowrie, etc.).

required
attack_type str

Type of attack (e.g., all, specific attack types).

required
prioritize str

Prioritization mechanism to use (e.g., recent, persistent).

required
format_ str

Desired format of the response (e.g., json, csv, txt).

required
include_mass_scanners bool

query parameter flag to include IOCs that are known mass scanners.

required
include_tor_exit_nodes bool

query parameter flag to include IOCs that are known tor exit nodes.

required

Returns:

Name Type Description
Response

The HTTP response with formatted IOC data.

Source code in docs/Submodules/GreedyBear/api/views/feeds.py
@api_view([GET])
def feeds(request, feed_type, attack_type, prioritize, format_):
    """
    Handle requests for IOC feeds with specific parameters and format the response accordingly.

    Args:
        request: The incoming request object.
        feed_type (str): Type of feed (e.g., log4j, cowrie, etc.).
        attack_type (str): Type of attack (e.g., all, specific attack types).
        prioritize (str): Prioritization mechanism to use (e.g., recent, persistent).
        format_ (str): Desired format of the response (e.g., json, csv, txt).
        include_mass_scanners (bool): query parameter flag to include IOCs that are known mass scanners.
        include_tor_exit_nodes (bool): query parameter flag to include IOCs that are known tor exit nodes.

    Returns:
        Response: The HTTP response with formatted IOC data.
    """
    logger.info(f"request /api/feeds with params: feed type: {feed_type}, " f"attack_type: {attack_type}, prioritization: {prioritize}, format: {format_}")

    feed_params = FeedRequestParams({"feed_type": feed_type, "attack_type": attack_type, "format_": format_})
    feed_params.apply_default_filters(request.query_params)
    feed_params.set_prioritization(prioritize)

    valid_feed_types = get_valid_feed_types()
    iocs_queryset = get_queryset(request, feed_params, valid_feed_types)
    return feeds_response(iocs_queryset, feed_params, valid_feed_types)

Advanced Feeds

Handle requests for IOC feeds based on query parameters and format the response accordingly.

Parameters:

Name Type Description Default
request

The incoming request object.

required
feed_type str

Type of feed to retrieve. (supported: cowrie, log4j, etc.; default: all)

required
attack_type str

Type of attack to filter. (supported: scanner, payload_request, all; default: all)

required
max_age int

Maximum number of days since last occurrence. E.g. an IOC that was last seen 4 days ago is excluded by default. (default: 3)

required
min_days_seen int

Minimum number of days on which an IOC must have been seen. (default: 1)

required
include_reputation str

;-separated list of reputation values to include, e.g. known attacker or known attacker; to include IOCs without reputation. (default: include all)

required
exclude_reputation str

;-separated list of reputation values to exclude, e.g. mass scanner or mass scanner;bot, crawler. (default: exclude none)

required
feed_size int

Number of IOC items to return. (default: 5000)

required
ordering str

Field to order results by, with optional - prefix for descending. (default: -last_seen)

required
verbose bool

true to include IOC properties that contain a lot of data, e.g. the list of days it was seen. (default: false)

required
paginate bool

true to paginate results. This forces the json format. (default: false)

required
format str

Response format type. Besides json, txt and csv are supported but the response will only contain IOC values (e.g. IP adresses) without further information. (default: json)

required

Returns:

Name Type Description
Response

The HTTP response with formatted IOC data.

Source code in docs/Submodules/GreedyBear/api/views/feeds.py
@api_view([GET])
@authentication_classes([CookieTokenAuthentication])
@permission_classes([IsAuthenticated])
def feeds_advanced(request):
    """
    Handle requests for IOC feeds based on query parameters and format the response accordingly.

    Args:
        request: The incoming request object.
        feed_type (str): Type of feed to retrieve. (supported: `cowrie`, `log4j`, etc.; default: `all`)
        attack_type (str): Type of attack to filter. (supported: `scanner`, `payload_request`, `all`; default: `all`)
        max_age (int): Maximum number of days since last occurrence. E.g. an IOC that was last seen 4 days ago is excluded by default. (default: 3)
        min_days_seen (int): Minimum number of days on which an IOC must have been seen. (default: 1)
        include_reputation (str): `;`-separated list of reputation values to include, e.g. `known attacker` or `known attacker;` to include IOCs without reputation. (default: include all)
        exclude_reputation (str): `;`-separated list of reputation values to exclude, e.g. `mass scanner` or `mass scanner;bot, crawler`. (default: exclude none)
        feed_size (int): Number of IOC items to return. (default: 5000)
        ordering (str): Field to order results by, with optional `-` prefix for descending. (default: `-last_seen`)
        verbose (bool): `true` to include IOC properties that contain a lot of data, e.g. the list of days it was seen. (default: `false`)
        paginate (bool): `true` to paginate results. This forces the json format. (default: `false`)
        format (str): Response format type. Besides `json`, `txt` and `csv` are supported but the response will only contain IOC values (e.g. IP adresses) without further information. (default: `json`)

    Returns:
        Response: The HTTP response with formatted IOC data.
    """
    logger.info(f"request /api/feeds/advanced/ with params: {request.query_params}")
    feed_params = FeedRequestParams(request.query_params)
    valid_feed_types = get_valid_feed_types()
    iocs_queryset = get_queryset(request, feed_params, valid_feed_types)
    verbose = feed_params.verbose == "true"
    paginate = feed_params.paginate == "true"
    if paginate:
        feed_params.format = "json"
        paginator = CustomPageNumberPagination()
        iocs = paginator.paginate_queryset(iocs_queryset, request)
        resp_data = feeds_response(iocs, feed_params, valid_feed_types, dict_only=True, verbose=verbose)
        return paginator.get_paginated_response(resp_data)
    return feeds_response(iocs_queryset, feed_params, valid_feed_types, verbose=verbose)

Feeds Pagination

Handle requests for paginated IOC feeds based on query parameters.

Parameters:

Name Type Description Default
request

The incoming request object.

required

Returns:

Name Type Description
Response

The paginated HTTP response with IOC data.

Source code in docs/Submodules/GreedyBear/api/views/feeds.py
@api_view([GET])
def feeds_pagination(request):
    """
    Handle requests for paginated IOC feeds based on query parameters.

    Args:
        request: The incoming request object.

    Returns:
        Response: The paginated HTTP response with IOC data.
    """

    logger.info(f"request /api/feeds with params: {request.query_params}")

    feed_params = FeedRequestParams(request.query_params)
    feed_params.format = "json"
    feed_params.apply_default_filters(request.query_params)
    feed_params.set_prioritization(request.query_params.get("prioritize"))

    valid_feed_types = get_valid_feed_types()
    iocs_queryset = get_queryset(request, feed_params, valid_feed_types)
    paginator = CustomPageNumberPagination()
    iocs = paginator.paginate_queryset(iocs_queryset, request)
    resp_data = feeds_response(iocs, feed_params, valid_feed_types, dict_only=True)
    return paginator.get_paginated_response(resp_data)

Cowrie Session

Retrieve Cowrie honeypot session data including command sequences, credentials, and session details. Queries can be performed using either an IP address to find all sessions from that source, or a SHA-256 hash to find sessions containing a specific command sequence.

Parameters:

Name Type Description Default
request

The HTTP request object containing query parameters

required
query (str, required)

The search term, can be either an IP address or the SHA-256 hash of a command sequence. SHA-256 hashes should match command sequences generated using Python's "\n".join(sequence) format.

required
include_similar bool

When "true", expands the result to include all sessions that executed command sequences belonging to the same cluster(s) as command sequences found in the initial query result. Requires CLUSTER_COWRIE_COMMAND_SEQUENCES enabled in configuration. Default: false

required
include_credentials bool

When "true", includes all credentials used across matching Cowrie sessions. Default: false

required
include_session_data bool

When "true", includes detailed information about matching Cowrie sessions. Default: false

required

Returns:

Name Type Description
Response 200

JSON object containing: - query (str): The original query parameter - commands (list[str]): Unique command sequences (newline-delimited strings) - sources (list[str]): Unique source IP addresses - credentials (list[str], optional): Unique credentials if include_credentials=true - sessions (list[dict], optional): Session details if include_session_data=true - time (datetime): Session start time - duration (float): Session duration in seconds - source (str): Source IP address - interactions (int): Number of interactions in session - credentials (list[str]): Credentials used in this session - commands (str): Command sequence executed (newline-delimited)

Response 400

Bad Request - Missing or invalid query parameter

Response 404

Not Found - No matching sessions found

Response 500

Internal Server Error - Unexpected error occurred

Example Queries

/api/cowrie_session?query=1.2.3.4 /api/cowrie_session?query=5120e94e366ec83a79ee80454e4d1c76c06499ab19032bcdc7f0b4523bdb37a6 /api/cowrie_session?query=1.2.3.4&include_credentials=true&include_session_data=true&include_similar=true

Source code in docs/Submodules/GreedyBear/api/views/cowrie_session.py
@api_view([GET])
@authentication_classes([CookieTokenAuthentication])
@permission_classes([IsAuthenticated])
def cowrie_session_view(request):
    """
    Retrieve Cowrie honeypot session data including command sequences, credentials, and session details.
    Queries can be performed using either an IP address to find all sessions from that source,
    or a SHA-256 hash to find sessions containing a specific command sequence.

    Args:
        request: The HTTP request object containing query parameters
        query (str, required): The search term, can be either an IP address or the SHA-256 hash of a command sequence.
            SHA-256 hashes should match command sequences generated using Python's "\\n".join(sequence) format.
        include_similar (bool, optional): When "true", expands the result to include all sessions that executed
            command sequences belonging to the same cluster(s) as command sequences found in the initial query result.
            Requires CLUSTER_COWRIE_COMMAND_SEQUENCES enabled in configuration. Default: false
        include_credentials (bool, optional): When "true", includes all credentials used across matching Cowrie sessions.
            Default: false
        include_session_data (bool, optional): When "true", includes detailed information about matching Cowrie sessions.
            Default: false

    Returns:
        Response (200): JSON object containing:
            - query (str): The original query parameter
            - commands (list[str]): Unique command sequences (newline-delimited strings)
            - sources (list[str]): Unique source IP addresses
            - credentials (list[str], optional): Unique credentials if include_credentials=true
            - sessions (list[dict], optional): Session details if include_session_data=true
                - time (datetime): Session start time
                - duration (float): Session duration in seconds
                - source (str): Source IP address
                - interactions (int): Number of interactions in session
                - credentials (list[str]): Credentials used in this session
                - commands (str): Command sequence executed (newline-delimited)
        Response (400): Bad Request - Missing or invalid query parameter
        Response (404): Not Found - No matching sessions found
        Response (500): Internal Server Error - Unexpected error occurred

    Example Queries:
        /api/cowrie_session?query=1.2.3.4
        /api/cowrie_session?query=5120e94e366ec83a79ee80454e4d1c76c06499ab19032bcdc7f0b4523bdb37a6
        /api/cowrie_session?query=1.2.3.4&include_credentials=true&include_session_data=true&include_similar=true
    """
    observable = request.query_params.get("query")
    include_similar = request.query_params.get("include_similar", "false").lower() == "true"
    include_credentials = request.query_params.get("include_credentials", "false").lower() == "true"
    include_session_data = request.query_params.get("include_session_data", "false").lower() == "true"

    logger.info(f"Cowrie view requested by {request.user} for {observable}")
    source_ip = str(request.META["REMOTE_ADDR"])
    request_source = Statistics(source=source_ip, view=viewType.COWRIE_SESSION_VIEW.value)
    request_source.save()

    if not observable:
        return HttpResponseBadRequest("Missing required 'query' parameter")

    if is_ip_address(observable):
        sessions = CowrieSession.objects.filter(source__name=observable, duration__gt=0).prefetch_related("source", "commands")
        if not sessions.exists():
            raise Http404(f"No information found for IP: {observable}")

    elif is_sha256hash(observable):
        try:
            commands = CommandSequence.objects.get(commands_hash=observable.lower())
        except CommandSequence.DoesNotExist as exc:
            raise Http404(f"No command sequences found with hash: {observable}") from exc
        sessions = CowrieSession.objects.filter(commands=commands, duration__gt=0).prefetch_related("source", "commands")
    else:
        return HttpResponseBadRequest("Query must be a valid IP address or SHA-256 hash")

    if include_similar:
        commands = set(s.commands for s in sessions if s.commands)
        clusters = set(cmd.cluster for cmd in commands if cmd.cluster is not None)
        related_sessions = CowrieSession.objects.filter(commands__cluster__in=clusters).prefetch_related("source", "commands")
        sessions = sessions.union(related_sessions)

    response_data = {
        "license": FEEDS_LICENSE,
        "query": observable,
    }

    unique_commands = set(s.commands for s in sessions if s.commands)
    response_data["commands"] = sorted("\n".join(cmd.commands) for cmd in unique_commands)
    response_data["sources"] = sorted(set(s.source.name for s in sessions), key=socket.inet_aton)
    if include_credentials:
        response_data["credentials"] = sorted(set(itertools.chain(*[s.credentials for s in sessions])))
    if include_session_data:
        response_data["sessions"] = [
            {
                "time": s.start_time,
                "duration": s.duration,
                "source": s.source.name,
                "interactions": s.interaction_count,
                "credentials": s.credentials if s.credentials else [],
                "commands": "\n".join(s.commands.commands) if s.commands else "",
            }
            for s in sessions
        ]

    return Response(response_data, status=status.HTTP_200_OK)

Command Sequence

View function that handles command sequence queries based on IP addresses or SHA-256 hashes.

Retrieves and returns command sequences and related IOCs based on the query parameter. If IP address is given, returns all command sequences executed from this IP. If SHA-256 hash is given, returns details about the specific command sequence. Can include similar command sequences if requested.

Parameters:

Name Type Description Default
request

The HTTP request object containing query parameters

required
query str

The search term, can be either an IP address or a SHA-256 hash.

required
include_similar bool

When parameter is present, returns related command sequences based on clustering.

required

Returns:

Type Description

Response object with command sequence data or an error response

Raises:

Type Description
Http404

If the requested resource is not found

Source code in docs/Submodules/GreedyBear/api/views/command_sequence.py
@api_view([GET])
@authentication_classes([CookieTokenAuthentication])
@permission_classes([IsAuthenticated])
def command_sequence_view(request):
    """
    View function that handles command sequence queries based on IP addresses or SHA-256 hashes.

    Retrieves and returns command sequences and related IOCs based on the query parameter.
    If IP address is given, returns all command sequences executed from this IP.
    If SHA-256 hash is given, returns details about the specific command sequence.
    Can include similar command sequences if requested.

    Args:
        request: The HTTP request object containing query parameters
        query (str): The search term, can be either an IP address or a SHA-256 hash.
        include_similar (bool): When parameter is present, returns related command sequences based on clustering.

    Returns:
        Response object with command sequence data or an error response

    Raises:
        Http404: If the requested resource is not found
    """
    observable = request.query_params.get("query")
    include_similar = request.query_params.get("include_similar") is not None
    logger.info(f"Command Sequence view requested by {request.user} for {observable}")
    source_ip = str(request.META["REMOTE_ADDR"])
    request_source = Statistics(source=source_ip, view=viewType.COMMAND_SEQUENCE_VIEW.value)
    request_source.save()

    if not observable:
        return HttpResponseBadRequest("Missing required 'query' parameter")

    if is_ip_address(observable):
        sessions = CowrieSession.objects.filter(source__name=observable, start_time__isnull=False, commands__isnull=False)
        sequences = set(s.commands for s in sessions)
        seqs = [
            {
                "time": s.start_time,
                "command_sequence": "\n".join(s.commands.commands),
                "command_sequence_hash": s.commands.commands_hash,
            }
            for s in sessions
        ]
        related_iocs = IOC.objects.filter(cowriesession__commands__in=sequences).distinct().only("name")
        if include_similar:
            related_clusters = set(s.cluster for s in sequences if s.cluster is not None)
            related_iocs = IOC.objects.filter(cowriesession__commands__cluster__in=related_clusters).distinct().only("name")
        if not seqs:
            raise Http404(f"No command sequences found for IP: {observable}")
        data = {
            "license": FEEDS_LICENSE,
            "executed_commands": seqs,
            "executed_by": sorted([ioc.name for ioc in related_iocs]),
        }
        return Response(data, status=status.HTTP_200_OK)

    if is_sha256hash(observable):
        try:
            seq = CommandSequence.objects.get(commands_hash=observable)
            seqs = CommandSequence.objects.filter(cluster=seq.cluster) if include_similar and seq.cluster is not None else [seq]
            commands = ["\n".join(seq.commands) for seq in seqs]
            sessions = CowrieSession.objects.filter(commands__in=seqs, start_time__isnull=False)
            iocs = [
                {
                    "time": s.start_time,
                    "ip": s.source.name,
                }
                for s in sessions
            ]
            data = {
                "license": FEEDS_LICENSE,
                "commands": commands,
                "iocs": sorted(iocs, key=lambda d: d["time"], reverse=True),
            }
            return Response(data, status=status.HTTP_200_OK)
        except CommandSequence.DoesNotExist as exc:
            raise Http404(f"No command sequences found with hash: {observable}") from exc

    return HttpResponseBadRequest("Query must be a valid IP address or SHA-256 hash")

General Honeypot List

Retrieve a list of all general honeypots, optionally filtering by active status.

Parameters:

Name Type Description Default
request

The incoming request object containing query parameters.

required

Returns:

Name Type Description
Response

A JSON response containing the list of general honeypots.

Source code in docs/Submodules/GreedyBear/api/views/general_honeypot.py
@api_view([GET])
def general_honeypot_list(request):
    """
    Retrieve a list of all general honeypots, optionally filtering by active status.

    Args:
        request: The incoming request object containing query parameters.

    Returns:
        Response: A JSON response containing the list of general honeypots.
    """

    logger.info(f"Requested general honeypots list from {request.user}.")
    active = request.query_params.get("onlyActive")
    honeypots = []
    generalHoneypots = GeneralHoneypot.objects.all()
    if active == "true":
        generalHoneypots = generalHoneypots.filter(active=True)
        logger.info(f"Requested only active general honeypots from {request.user}")
    honeypots.extend([hp.name for hp in generalHoneypots])

    logger.info(f"General honeypots: {honeypots} given back to user {request.user}")
    return Response(honeypots)

Statistics

Bases: ViewSet

A viewset for viewing and editing statistics related to feeds and enrichment data.

Provides actions to retrieve statistics about the sources and downloads of feeds, as well as statistics on enrichment data.

Source code in docs/Submodules/GreedyBear/api/views/statistics.py
class StatisticsViewSet(viewsets.ViewSet):
    """
    A viewset for viewing and editing statistics related to feeds and enrichment data.

    Provides actions to retrieve statistics about the sources and downloads of feeds,
    as well as statistics on enrichment data.
    """

    @action(detail=True, methods=["GET"])
    def feeds(self, request, pk=None):
        """
        Retrieve feed statistics, including the number of sources and downloads.

        Args:
            request: The incoming request object.
            pk (str): The type of statistics to retrieve (e.g., "sources", "downloads").

        Returns:
            Response: A JSON response containing the requested statistics.
        """
        if pk == "sources":
            annotations = {
                "Sources": Count(
                    "source",
                    distinct=True,
                    filter=Q(view=viewType.FEEDS_VIEW.value),
                )
            }
        elif pk == "downloads":
            annotations = {"Downloads": Count("source", filter=Q(view=viewType.FEEDS_VIEW.value))}
        else:
            logger.error("this is impossible. check the code")
            return HttpResponseServerError()
        return self.__aggregation_response_static_statistics(annotations)

    @action(detail=True, methods=["get"])
    def enrichment(self, request, pk=None):
        """
        Retrieve enrichment statistics, including the number of sources and requests.

        Args:
            request: The incoming request object.
            pk (str): The type of statistics to retrieve (e.g., "sources", "requests").

        Returns:
            Response: A JSON response containing the requested statistics.
        """
        if pk == "sources":
            annotations = {
                "Sources": Count(
                    "source",
                    distinct=True,
                    filter=Q(view=viewType.ENRICHMENT_VIEW.value),
                )
            }
        elif pk == "requests":
            annotations = {"Requests": Count("source", filter=Q(view=viewType.ENRICHMENT_VIEW.value))}
        else:
            logger.error("this is impossible. check the code")
            return HttpResponseServerError()
        return self.__aggregation_response_static_statistics(annotations)

    @action(detail=False, methods=["get"])
    def feeds_types(self, request):
        """
        Retrieve statistics for different types of feeds, including Log4j, Cowrie,
        and general honeypots.

        Args:
            request: The incoming request object.

        Returns:
            Response: A JSON response containing the feed type statistics.
        """
        # FEEDS
        annotations = {
            "Log4j": Count("name", distinct=True, filter=Q(log4j=True)),
            "Cowrie": Count("name", distinct=True, filter=Q(cowrie=True)),
        }
        # feed_type for each general honeypot in the list
        generalHoneypots = GeneralHoneypot.objects.all().filter(active=True)
        for hp in generalHoneypots:
            annotations[hp.name] = Count("name", Q(general_honeypot__name__iexact=hp.name.lower()))
        return self.__aggregation_response_static_ioc(annotations)

    def __aggregation_response_static_statistics(self, annotations: dict) -> Response:
        """
        Helper method to generate statistics response based on annotations.

        Args:
            annotations (dict): Dictionary containing the annotations for the query.

        Returns:
            Response: A JSON response containing the aggregated statistics.
        """
        delta, basis = self.__parse_range(self.request)
        qs = Statistics.objects.filter(request_date__gte=delta).annotate(date=Trunc("request_date", basis)).values("date").annotate(**annotations)
        return Response(qs)

    def __aggregation_response_static_ioc(self, annotations: dict) -> Response:
        """
        Helper method to generate IOC response based on annotations.

        Args:
            annotations (dict): Dictionary containing the annotations for the query.

        Returns:
            Response: A JSON response containing the aggregated IOC data.
        """
        delta, basis = self.__parse_range(self.request)

        qs = (
            IOC.objects.filter(last_seen__gte=delta)
            .exclude(general_honeypot__active=False)
            .annotate(date=Trunc("last_seen", basis))
            .values("date")
            .annotate(**annotations)
        )
        return Response(qs)

    @staticmethod
    def __parse_range(request):
        """
        Parse the range parameter from the request query string to determine the time range for the query.

        Args:
            request: The incoming request object.

        Returns:
            tuple: A tuple containing the delta time and basis for the query range.
        """
        try:
            range_str = request.GET["range"]
        except KeyError:
            # default
            range_str = "7d"

        return parse_humanized_range(range_str)

__aggregation_response_static_ioc(annotations)

Helper method to generate IOC response based on annotations.

Parameters:

Name Type Description Default
annotations dict

Dictionary containing the annotations for the query.

required

Returns:

Name Type Description
Response Response

A JSON response containing the aggregated IOC data.

Source code in docs/Submodules/GreedyBear/api/views/statistics.py
def __aggregation_response_static_ioc(self, annotations: dict) -> Response:
    """
    Helper method to generate IOC response based on annotations.

    Args:
        annotations (dict): Dictionary containing the annotations for the query.

    Returns:
        Response: A JSON response containing the aggregated IOC data.
    """
    delta, basis = self.__parse_range(self.request)

    qs = (
        IOC.objects.filter(last_seen__gte=delta)
        .exclude(general_honeypot__active=False)
        .annotate(date=Trunc("last_seen", basis))
        .values("date")
        .annotate(**annotations)
    )
    return Response(qs)

__aggregation_response_static_statistics(annotations)

Helper method to generate statistics response based on annotations.

Parameters:

Name Type Description Default
annotations dict

Dictionary containing the annotations for the query.

required

Returns:

Name Type Description
Response Response

A JSON response containing the aggregated statistics.

Source code in docs/Submodules/GreedyBear/api/views/statistics.py
def __aggregation_response_static_statistics(self, annotations: dict) -> Response:
    """
    Helper method to generate statistics response based on annotations.

    Args:
        annotations (dict): Dictionary containing the annotations for the query.

    Returns:
        Response: A JSON response containing the aggregated statistics.
    """
    delta, basis = self.__parse_range(self.request)
    qs = Statistics.objects.filter(request_date__gte=delta).annotate(date=Trunc("request_date", basis)).values("date").annotate(**annotations)
    return Response(qs)

__parse_range(request) staticmethod

Parse the range parameter from the request query string to determine the time range for the query.

Parameters:

Name Type Description Default
request

The incoming request object.

required

Returns:

Name Type Description
tuple

A tuple containing the delta time and basis for the query range.

Source code in docs/Submodules/GreedyBear/api/views/statistics.py
@staticmethod
def __parse_range(request):
    """
    Parse the range parameter from the request query string to determine the time range for the query.

    Args:
        request: The incoming request object.

    Returns:
        tuple: A tuple containing the delta time and basis for the query range.
    """
    try:
        range_str = request.GET["range"]
    except KeyError:
        # default
        range_str = "7d"

    return parse_humanized_range(range_str)

enrichment(request, pk=None)

Retrieve enrichment statistics, including the number of sources and requests.

Parameters:

Name Type Description Default
request

The incoming request object.

required
pk str

The type of statistics to retrieve (e.g., "sources", "requests").

None

Returns:

Name Type Description
Response

A JSON response containing the requested statistics.

Source code in docs/Submodules/GreedyBear/api/views/statistics.py
@action(detail=True, methods=["get"])
def enrichment(self, request, pk=None):
    """
    Retrieve enrichment statistics, including the number of sources and requests.

    Args:
        request: The incoming request object.
        pk (str): The type of statistics to retrieve (e.g., "sources", "requests").

    Returns:
        Response: A JSON response containing the requested statistics.
    """
    if pk == "sources":
        annotations = {
            "Sources": Count(
                "source",
                distinct=True,
                filter=Q(view=viewType.ENRICHMENT_VIEW.value),
            )
        }
    elif pk == "requests":
        annotations = {"Requests": Count("source", filter=Q(view=viewType.ENRICHMENT_VIEW.value))}
    else:
        logger.error("this is impossible. check the code")
        return HttpResponseServerError()
    return self.__aggregation_response_static_statistics(annotations)

feeds(request, pk=None)

Retrieve feed statistics, including the number of sources and downloads.

Parameters:

Name Type Description Default
request

The incoming request object.

required
pk str

The type of statistics to retrieve (e.g., "sources", "downloads").

None

Returns:

Name Type Description
Response

A JSON response containing the requested statistics.

Source code in docs/Submodules/GreedyBear/api/views/statistics.py
@action(detail=True, methods=["GET"])
def feeds(self, request, pk=None):
    """
    Retrieve feed statistics, including the number of sources and downloads.

    Args:
        request: The incoming request object.
        pk (str): The type of statistics to retrieve (e.g., "sources", "downloads").

    Returns:
        Response: A JSON response containing the requested statistics.
    """
    if pk == "sources":
        annotations = {
            "Sources": Count(
                "source",
                distinct=True,
                filter=Q(view=viewType.FEEDS_VIEW.value),
            )
        }
    elif pk == "downloads":
        annotations = {"Downloads": Count("source", filter=Q(view=viewType.FEEDS_VIEW.value))}
    else:
        logger.error("this is impossible. check the code")
        return HttpResponseServerError()
    return self.__aggregation_response_static_statistics(annotations)

feeds_types(request)

Retrieve statistics for different types of feeds, including Log4j, Cowrie, and general honeypots.

Parameters:

Name Type Description Default
request

The incoming request object.

required

Returns:

Name Type Description
Response

A JSON response containing the feed type statistics.

Source code in docs/Submodules/GreedyBear/api/views/statistics.py
@action(detail=False, methods=["get"])
def feeds_types(self, request):
    """
    Retrieve statistics for different types of feeds, including Log4j, Cowrie,
    and general honeypots.

    Args:
        request: The incoming request object.

    Returns:
        Response: A JSON response containing the feed type statistics.
    """
    # FEEDS
    annotations = {
        "Log4j": Count("name", distinct=True, filter=Q(log4j=True)),
        "Cowrie": Count("name", distinct=True, filter=Q(cowrie=True)),
    }
    # feed_type for each general honeypot in the list
    generalHoneypots = GeneralHoneypot.objects.all().filter(active=True)
    for hp in generalHoneypots:
        annotations[hp.name] = Count("name", Q(general_honeypot__name__iexact=hp.name.lower()))
    return self.__aggregation_response_static_ioc(annotations)

Serializers

EnrichmentSerializer

Bases: Serializer

Source code in docs/Submodules/GreedyBear/api/serializers.py
class EnrichmentSerializer(serializers.Serializer):
    found = serializers.BooleanField(read_only=True, default=False)
    ioc = IOCSerializer(read_only=True, default=None)
    query = serializers.CharField(max_length=250)

    def validate(self, data):
        """
        Check a given observable against regex expression
        """
        observable = data["query"]
        if not re.match(REGEX_IP, observable) or not re.match(REGEX_DOMAIN, observable):
            raise serializers.ValidationError("Observable is not a valid IP or domain")
        try:
            required_object = IOC.objects.get(name=observable)
            data["found"] = True
            data["ioc"] = required_object
        except IOC.DoesNotExist:
            data["found"] = False
        return data

validate(data)

Check a given observable against regex expression

Source code in docs/Submodules/GreedyBear/api/serializers.py
def validate(self, data):
    """
    Check a given observable against regex expression
    """
    observable = data["query"]
    if not re.match(REGEX_IP, observable) or not re.match(REGEX_DOMAIN, observable):
        raise serializers.ValidationError("Observable is not a valid IP or domain")
    try:
        required_object = IOC.objects.get(name=observable)
        data["found"] = True
        data["ioc"] = required_object
    except IOC.DoesNotExist:
        data["found"] = False
    return data

FeedsRequestSerializer

Bases: Serializer

Source code in docs/Submodules/GreedyBear/api/serializers.py
class FeedsRequestSerializer(serializers.Serializer):
    feed_type = serializers.CharField(max_length=120)
    attack_type = serializers.ChoiceField(choices=["scanner", "payload_request", "all"])
    max_age = serializers.IntegerField(min_value=1)
    min_days_seen = serializers.IntegerField(min_value=1)
    include_reputation = serializers.ListField(child=serializers.CharField(max_length=120))
    exclude_reputation = serializers.ListField(child=serializers.CharField(max_length=120))
    feed_size = serializers.IntegerField(min_value=1)
    ordering = serializers.CharField(max_length=120)
    verbose = serializers.ChoiceField(choices=["true", "false"])
    paginate = serializers.ChoiceField(choices=["true", "false"])
    format = serializers.ChoiceField(choices=["csv", "json", "txt"])

    def validate_feed_type(self, feed_type):
        logger.debug(f"FeedsRequestSerializer - validation feed_type: '{feed_type}'")
        return feed_type_validation(feed_type, self.context["valid_feed_types"])

    def validate_ordering(self, ordering):
        logger.debug(f"FeedsRequestSerializer - validation ordering: '{ordering}'")
        return ordering_validation(ordering)

Note: Enhanced with additional filter fields (ioc_type, etc.) in version > 2.4.0

FeedsResponseSerializer

Bases: Serializer

Source code in docs/Submodules/GreedyBear/api/serializers.py
class FeedsResponseSerializer(serializers.Serializer):
    feed_type = serializers.ListField(child=serializers.CharField(max_length=120))
    value = serializers.CharField(max_length=256)
    scanner = serializers.BooleanField()
    payload_request = serializers.BooleanField()
    first_seen = serializers.DateField(format="%Y-%m-%d")
    last_seen = serializers.DateField(format="%Y-%m-%d")
    attack_count = serializers.IntegerField(min_value=1)
    interaction_count = serializers.IntegerField(min_value=1)
    ip_reputation = serializers.CharField(allow_blank=True, max_length=32)
    asn = serializers.IntegerField(allow_null=True, min_value=1)
    destination_port_count = serializers.IntegerField(min_value=0)
    login_attempts = serializers.IntegerField(min_value=0)
    recurrence_probability = serializers.FloatField(min_value=0, max_value=1)
    expected_interactions = serializers.FloatField(min_value=0)

    def validate_feed_type(self, feed_type):
        logger.debug(f"FeedsResponseSerializer - validation feed_type: '{feed_type}'")
        return [feed_type_validation(feed, self.context["valid_feed_types"]) for feed in feed_type]

IOCSerializer

Bases: ModelSerializer

Source code in docs/Submodules/GreedyBear/api/serializers.py
class IOCSerializer(serializers.ModelSerializer):
    general_honeypot = GeneralHoneypotSerializer(many=True, read_only=True)

    class Meta:
        model = IOC
        exclude = [
            "related_urls",
        ]

Note: Includes FireHol categories field in version > 2.4.0

GeneralHoneypotSerializer

Bases: ModelSerializer

Source code in docs/Submodules/GreedyBear/api/serializers.py
class GeneralHoneypotSerializer(serializers.ModelSerializer):
    class Meta:
        model = GeneralHoneypot

    def to_representation(self, value):
        return value.name

Validation Functions

feed_type_validation

Validates that a given feed type exists in the set of valid feed types.

Parameters:

Name Type Description Default
feed_type str

The feed type to validate

required
valid_feed_types frozenset

Set of allowed feed type values

required

Returns:

Name Type Description
str str

The validated feed type string, unchanged

Raises:

Type Description
ValidationError

If feed_type is not found in valid_feed_types

Source code in docs/Submodules/GreedyBear/api/serializers.py
def feed_type_validation(feed_type: str, valid_feed_types: frozenset) -> str:
    """Validates that a given feed type exists in the set of valid feed types.

    Args:
        feed_type (str): The feed type to validate
        valid_feed_types (frozenset): Set of allowed feed type values

    Returns:
        str: The validated feed type string, unchanged

    Raises:
        serializers.ValidationError: If feed_type is not found in valid_feed_types
    """
    if feed_type not in valid_feed_types:
        logger.info(f"Feed type {feed_type} not in feed_choices {valid_feed_types}")
        raise serializers.ValidationError(f"Invalid feed_type: {feed_type}")
    return feed_type

ordering_validation

Validates that given ordering corresponds to a field in the IOC model.

Parameters:

Name Type Description Default
ordering str

The ordering to validate

required

Returns:

Name Type Description
str str

The validated ordering string, unchanged

Raises:

Type Description
ValidationError

If ordering does not correspond to a field in the IOC model

Source code in docs/Submodules/GreedyBear/api/serializers.py
@cache
def ordering_validation(ordering: str) -> str:
    """Validates that given ordering corresponds to a field in the IOC model.

    Args:
        ordering (str): The ordering to validate

    Returns:
        str: The validated ordering string, unchanged

    Raises:
        serializers.ValidationError: If ordering does not correspond to a field in the IOC model
    """
    if not ordering:
        raise serializers.ValidationError("Invalid ordering: <empty string>")
    # remove minus sign if present
    field_name = ordering[1:] if ordering.startswith("-") else ordering
    try:
        IOC._meta.get_field(field_name)
    except FieldDoesNotExist as exc:
        raise serializers.ValidationError(f"Invalid ordering: {ordering}") from exc
    return ordering