Threat Exchange Custom Plugin Developers Guide
This document explains how to write a new Threat Exchange plugin and extract maximum value out of your threat ecosystem by leveraging the functionality provided within the Threat Exchange module. To create a new developers guide, use this template.
Prerequisites
To create a new plugin, you need:
Python 3.x programming experience (intermediate level).
Access to the Netskope Cloud Exchange platform.
API or Python SDK access to the product or solution for which you need to write the plugin.
An account with minimum permission for the product.
The Cloud Exchange (CE) platform, and its Threat Exchange module, come with a rich set of features and functionality that allow for a high degree of customization, so we recommend that you familiarize yourself with the different aspects of the platform as listed below.
Note
This module supports sharing of data from Netskope to third party and vice-versa.
Core: CE core engine manages the 3rd-party plugins and their life cycle methods, plus has API endpoints for interacting with the platform to perform various tasks.
Module: The functional code areas that invoke modular-specific plugins to accomplish different workflows. Threat Exchange is one of the modules in Cloud Exchange.
Plugin: Plugins are Python packages that have logic to pull and push Threat IoC information to/from 3rd-party Threat Intel systems, which will then be stored in the Threat Exchange.
Plugin Configurations: Plugin configurations are the plugin class objects configured with the required parameters and scheduled by the Cloud Exchange core engine for pulling and pushing Threat IoC information.
Indicators (Threat IoCs): Indicators are malware hashes and malsite URLs objects gathered from various Threat Intel Platforms and stored in the Cloud Exchange database.
Use the Package Directory Structure for all Python code.
Make sure all the 3rd-party libraries packaged with the plugin package are checked for known vulnerabilities.
Make sure to follow standard Python code conventions: PEP 8 – Style Guide for Python Code.
Run and verify that the flake8 lint check passes with the docstring check enabled. Also, the maximum length of a line should be 80.
Convert the time-stamp values to the human-readable format (from epoch to DateTime object).
Make sure the time that is being displayed on the UI should be in the local timezone.
If possible, add a default value while adding a configuration parameter in the plugin.
For Scripts/Integrations written in Python, make sure to create unit tests. Refer to the below section Unit Testing.
Plugin architecture allows storing states; however, avoid storing huge objects for state management.
Check your python code for any vulnerabilities.
The plugin icon has to be under 10KB. Make sure to use the company logo(and not the product’s) with a transparent background . The recommended size for the logo is 300 x 50 or similar aspect ratio.
Use the checkpoint provided by the Threat Exchange core rather than implementing one on your own.
Follow the Plugin Directory Structure.
If the Plugin description contains a link, it should be a hyperlink redirecting to the documentation page. Refer CTE Trend Micro Vision One plugin for details.
The logger messages and the Toast messages should not contain the API Token and the Password type field values.
Pagination should always be considered while developing any feature in a plugin.
Make sure to add a retry mechanism for the 429 status code.
Make sure to map the comment field which gives more context to the SOC analyst. The comment field could include file name (in the case of Hash).
Use the checkpoint provided by the CE core rather than implementing one on your own.
Always check if the plugin supports the push mechanism or not and set the push_supported variable accordingly in manifest.json.
Use proper validation for the parameters passed to the validate method and provide the proper help text for all the parameters.
Use notifier object to raise the notification for failures or critical situations (like rate-limiting, or exceeding payload size) to notify the status of the plugin to the user.
Make sure to implement a proper logging mechanism with the logger object passed by the Cloud Exchange platform. Make sure enough logging is done which helps the Ops team in troubleshooting. Make sure any sensitive data is not logged or leaked in the notification.
Provide the proper help text (tooltip) for all the parameters. If feasible, make sure to explain the significance of the parameter in the tooltip.
There should be a placeholder for text type of configuration parameters.
Make sure to provide a meaningful name and description to plugin configuration parameters.
Make sure to provide an appropriate configuration type (text, number, password, choice, multi-choice) to the configuration parameters.
Make sure to use the proxy configuration dict and the SSL certificate validation flag that is passed by the CE platform while making any outbound request (API/SDK).
Make sure to collect the value of a non-mandatory parameter using the .get() method and provide a default value while using .get() method.
Make sure the plugin directory name (e.g sample_plugin) matches with the manifest.json’s id field.
User Agent should be added to the headers while making any API call. Format for the User Agent: netskope-ce-<ce_version>-<module>-<plugin_name>-<plugin_version>. Refer to URE Microsoft Azure AD or URE Crowdstrike Identity Protect plugin for more details.
Note
The plugin version should be dynamically fetched and to fetch netskope-ce-<version> string use the method defined by core.
API Tokens and Password fields should not use strip().
The log messages should start with "<module> <app name>; Plugin [configuration_name]: ".
Example: "URE Crowdstrik Plugin [CrowdStrike Configuration Name]: <log_message>". [This is a suggestion, you can avoid configuration name]. (logger.info(“<module> <plugin_name> Plugin: <message>”))
While logging an error log, if possible, we should add a traceback of the exception.
USE: self.logger.error(error, details=traceback.format_exc()).
The Toast message should not contain the "<app_name> <module> Plugin:" in the message.
Make sure to catch proper exceptions and status codes while and after making the API calls.
The CHANGELOG.md file should be updated with proper tags such as Added, Changed, and Fixed along with a proper user-friendly message. Make sure the file name should exactly matches CHANGELOG.md.
This section explains the process of writing a plugin from scratch
Download the sample plugin from the NetskopeOSS public Github repository.
Development Setup
Threat Exchange utilizes Python3 (v3.7 and above). Make sure to set up python3 in your development environment. Pytest is used to run unit tests.
Following Python libraries are included within the Netskope Threat Exchange platform.
Library Name | Version |
---|---|
aiofiles | 22.1.0 |
amqp | 5.1.1 |
anyio | 3.6.2 |
asgiref | 3.6.0 |
attrs | 22.2.0 |
azure-core | 1.26.2 |
azure-storage-blob | 12.14.1 |
bcrypt | 4.0.1 |
boto3 | 1.26.51 |
botocore | 1.29.51 |
billiard | 3.6.4.0 |
celery | 5.2.7 |
cabby | 0.1.23 |
cachetools | 5.2.1 |
celerybeat-mongo | 0.2.0 |
certifi | 2022.12.7 |
cffi | 1.13.2 |
chardet | 5.1.0 |
charset-normalizer | 3.0.1 |
click | 8.1.3 |
click-didyoumean | 0.3.0 |
click-plugins | 1.1.1 |
click-repl | 0.2.0 |
colorama | 0.4.6 |
colorlog | 6.7.0 |
cryptography | 39.0.0 |
cybox | 2.1.0.21 |
defusedxml | 0.7.1 |
dnspython | 2.3.0 |
docker | 6.0.1 |
fastapi | 0.89.1 |
furl | 2.1.3 |
google-api-core | 2.11.0 |
google-auth | 2.16.0 |
google-cloud-core | 2.3.2 |
google-cloud-pubsub | 2.13.12 |
google-cloud-pubsublite | 1.6.0 |
google-cloud-storage | 2.7.0 |
google-crc32c | 1.5.0 |
google-resumable-media | 2.4.0 |
googleapis-common-protos | 1.58.0 |
grpc-google-iam-v1 | 0.12.6 |
grpcio | 1.51.1 |
grpcio-status | 1.51.1 |
gunicorn | 20.1.0 |
h11 | 0.14.0 |
idna | 3.4 |
importlib-metadata | 6.0.0 |
isodate | 0.6.1 |
jmespath | 1.0.1 |
jsonpath | 0.8 |
jsonschema | 4.17.3 |
kombu | 5.2.4 |
libcst | 0.3.21 |
libtaxii | 1.1.119 |
lxml | 4.9.2 |
mongoengine | 0.25.0 |
mongoquery | 1.4.2 |
more-itertools | 9.0.0 |
MarkupSafe | 2.1.2 |
memory-profiler | 0.61.0 |
mixbox | 1.0.5 |
msrest | 0.7.1 |
multidict | 6.0.4 |
mypy-extensions | 0.4.3 |
netskopesdk | 0.0.25 |
numpy | 1.23.5 |
oauthlib | 3.2.2 |
onelogin | 3.1.0 |
ordered-set | 4.1.0 |
orderedmultidict | 1.0.1 |
overrides | 6.5.0 |
pandas | 1.5.0 |
packaging | 23.0 |
passlib | 1.7.4 |
pycparser | 2.21 |
prompt-toolkit | 3.0.36 |
proto-plus | 1.22.2 |
protobuf | 4.21.12 |
psutil | 5.9.4 |
pydantic | 1.10.4 |
pyasn1 | 0.4.8 |
pyasn1-modules | 0.2.8 |
PyJWT | 2.6.0 |
pymongo | 4.3.4 |
pyparsing | 3.0.9 |
python-dateutil | 2.8.2 |
pyrsistent | 0.15.6 |
python-multipart | 0.0.5 |
python3-saml | 1.15.0 |
pytz | 2022.7.1 |
PyYAML | 6.0 |
requests | 2.28.2 |
requests-oauthlib | 1.3.1 4.9 |
rsa | 4.9 |
six | 1.16.0 |
starlette | 0.22.0 |
sniffio | 1.3.0 |
s3transfer | 0.6.0 |
stix | 1.2.0.11 |
taxii2-client | 2.3.0 |
typing-inspect | 0.8.0 |
typing-utils | 0.1.0 |
typing_extensions | 4.4.0 |
urllib3 | 1.26.14 |
uvicorn | 0.20.0 |
vine | 5.0.0 |
wcwidth | 0.2.6 |
weakrefmethod | 1.0.3 |
websocket-client | 1.4.2 |
Werkzeug | 2.2.2 |
xmlsec | 1.3.11 |
zipp | 3.11.0 |
requests-mock | 1.7.0 |
Netskope advises bundling any of the third party python libraries your plugin will need with the plugin package itself. To achieve this bundling use the pip installer; it provides a switch which takes a directory as an input. If it is provided, pip will install the packages into that directory.
For example, the command shown below will install the cowsay package into the directory lib.
> pip install cowsay --target ./lib
For the official documentation on this, refer https://pip.pypa.io/en/stable/reference/pip_install/#cmdoption-t.
While importing modules from above lib folder, we should be using relative import instead of absolute import, as shown below:
rom .lib import cowsay
Recommended IDEs are PyCharm or Visual Studio Code.
Plugin Directory Structure
This section describes the typical directory structure for the Threat Exchange plugin.
/sample_plugin/ ├── __init__.py ├── icon.png ├── main.py └── manifest.json
README.md: README file contains the documentation for the Plugin integration use-case.
__init__.py: Every plugin package is considered a python module by the Cloud Exchange code. Make sure every plugin package contains the empty “__init__.py” file.
CHANGELOG.md: This file contains the details about plugin update and should be updated with proper tags as Added, Changed, Fixed along with a proper user-friendly message
icon.png: Plugin icon logo, this will be visible in the plugin chiclet and configuration cards on the UI. The logo should have a transparent background with recommended size of 300*50 pixels or a similar aspect ratio.
main.py: This python file contains the Plugin class containing the concrete implementation for the pull, push and validate method.
manifest.json: Manifest file for the plugin package containing information about all the configurable parameters and their data types. This file has more information about the plugin integration as well.
The listed files here are mandatory for any plugin integration, but you can add other files based on specific integration requirements.
Note: Make sure the plugin directory name (e.g sample_plugin) matches with the manifest.json’s id field.
This is a file that contains details about plugin update and should be updated with proper tagsas Added, Changed, Fixed along with a proper user-friendly message.
Added: Use it when new features are added.
Fixed: Use it when any bug/error fixed.
Changed: Use it when there is any change in existing implementation of plugin.
Sample Changelog.md
# 1.0.1 ## Fixed - Fixed pagination when there are more than 10k Logs # 1.0.0 ## Added - Initial release.
This is a JSON file that stores the meta-information related to the plugin, which is then read by the Threat Exchange module to render the plugin in the UI, as well as enabling the Threat Exchange module to know more about the plugin, including required configuration parameters, the plugin-id, the plugin-name, etc.
Every plugin must contain this file with the required information so that Threat Exchange can instantiate the Plugin object properly.
Common parameters for manifest.json include:
name: (string) Name of the plugin. (Required)
id: (string) Id of the plugin package. Make sure it is unique across all the plugin installed in the Cloud Exchange. The ID has to match the directory name of the plugin package. (Required)
version: (string) Version of the plugin. Usage of a MAJOR.MINOR.PATCH (ex. 1.0.1) versioning scheme is encouraged although there are no restrictions. (Required)
description: (string) Description of the plugin. Provide a detailed description which mentions the capabilities and instructions to use the plugin, (ex. This plugin works with product foo and extracts both md5 and sha256 hashes as well as malURL to Threat Exchange and pushes the same to product foo.) This description would appear on the Plugin Configuration card. (Required)
push_supported: (boolean) This flag indicates whether the plugin supports the push method or not. If it is set to false then sharing related fields will not be displayed in the UI. (optional, defaults to true)
patch_supported: (boolean) This flag indicates whether the integrated product supports incrementally reporting indicators. Certain products (e.g. Netskope features using RESTAPIv1) expect that all the indicators have to be reported each time. In such cases “patch_supported” is required to be set as ‘False’. Alternatively, ServiceNow allows sharing indicators one at a time and retains the previously shared indicators. In its a case, patch_supported was required to be set as ‘True’. (Required)
configuration: (array) Array of JSON objects that contains information about all the parameters required by the plugin - their name, type, id, etc. The common parameters for the nested JSON objects are explained below.
label: Name of the parameter. This will be displayed on the plugin configuration page. (Required)
key: Unique parameter key, which will be used as a key in the python dict object where the plugin configuration is used. (Required)
type: Value type of the parameter. Allowed values are 'text', 'password', 'number', 'choice', and ‘multichoice’. (Required) Refer to Plugin Configuration parameter types below for more details.
default: The default value for this parameter. This value will appear in the plugin configuration page on Threat Exchange UI. Supported data-types are "text", "number", and “list” (for multichoice type). (Required)
mandatory: Boolean which indicates whether this parameter is mandatory or not. If a parameter is mandatory Threat Exchange UI won't let you pass an empty value for the parameter. Allowed values are `true` and `false`. (Required)
description: Help text level description for the parameter which can give more details about the parameter and expected value. This string will appear in the plugin configuration page as a help-text. (Required)
choices: A list of JSON objects containing key and value as JSON keys. This parameter is only supported by 'type': 'choice and multichoice'.
Make sure all the required plugin configuration parameters are listed under the configuration section of manifest.json for the plugin.
Use this parameter for storing any secrets/passwords for authentication with API endpoints. Parameters with type as the password will have a password text box in the Plugin configuration page and will be obfuscated and encrypted by the platform.
Sample JSON
"configuration": [ { "label": "Api token", "key": "api_token", "type": "password" }, ]
Plugin configuration view:
Use this parameter for storing any string information such as base-url, username, etc. This parameter will have a normal text input on the plugin configuration page.
Sample JSON
"configuration": [ { "label": "Tenant Name", "key": "tenant_name", "type": "text" }, ]
Plugin configuration view:
Use this parameter for storing number/float values. This parameter will have a number input field on the plugin configuration page.
Sample JSON
"configuration": [ { "label": "Maximum File hash list size in MB.", "key": "max_size", "type": "number" }, ]
Plugin configuration view:
Use this parameter for storing any enumeration parameter values. This parameter will have a dropdown on the plugin configuration page.
Sample JSON
"configuration": [ { "label": "Type of Threat data to pull", "key": "ioc_type", "type": "choice", "choices": [ { "key": "Both", "value": ["malware", “malsite”] }, { "key": "Malware", "value": "malware" }, { "key": "Malsite", "value": "malsite" } ] }, ]
Plugin configuration view:
After selecting the input.
Use this parameter for storing multiple choice values. This parameter will have a dropdown on the plugin configuration page with ability to select multiple values.
Sample JSON
"configuration": [ { "label": "Severity", "key": "severity", "type": "multichoice", "choices": [ { "key": "Unknown", "value": "unknown" }, { "key": "Low", "value": "low" }, { "key": "Medium", "value": "medium" }, { "key": "High", "value": "high" }, { "key": "Critical", "value": "critical" } ], "default": [ "critical", "high", "medium", "low", "unknown" ], "mandatory": false, "description": "Only indicators with matching severity will be saved." } ]
Plugin Configuration View:
This parameter stores a boolean value, toggle enabled is True and toggle disabled is False.
Enable SSL Verification: This variable should be used while making any API call in plugin.
Use System Proxy (‘proxy’): Use system proxy configured in Settings.(Default: False)
Plugin Configuration View:
Note
This parameter is provided by Core, and it is not allowed to add from plugins manifest.json file.
This python file contains the core implementation of the plugin.
from netskope.integrations.cte.plugin_base import PluginBase, ValidationResult, PushResult from netskope.integrations.cte.models import Indicator, IndicatorType from netskope.integrations.cte.models.business_rule import Action, ActionWithoutParams
PluginBase provides access to variables which can be used during the plugin lifecycle
Methods. Below is the list of variables.
Variable Name | Usage | Description |
---|---|---|
self.logger | self.logger.error(“Message”) self.logger.warn(“Message”) self.logger.info(“Message”) | Logger handle provided by core. Use this object to log important events. The logs would be visible in the Cloud Exchange Audit logs. Refer the Logging documentation. |
self.configuration | self.configuration.get(<attribute-key-name>) | JSON representation of the configuration object of Plugin instance. Use this to access the configuration attributes like authentication credentials, server details, etc. Use the key name of the attribute mentioned in manifest.json. |
self.last_run_at | If self.last_run_at: self.last_run_at.timestamp() Use this format to convert the last run time in epoch format. | Provides the timestamp of last successful run time of the Plugin’s pull method. The Cloud Exchange core maintains the checkpoint time after each successful pull() execution. For the first execution, the value would be None. The datatype of the object is datetime. |
self.storage | Cloud Exchange provides the plugin a mechanism to maintain state. Use this object to persist any state that would be required during subsequent calls. The datatype of this object is python dict. | |
self.notifier | self.notifier.info(“message”) self.notifier.warn(“message”) self.notifier.error(“message”) | This object provides handle of Cloud Exchange core’s notifier. Use this object to push any notification to the platform. The notifications would be visible on Threat Exchange UI. Make sure the message contains summarized information for user to read and take necessary actions. For example, a used notifier in Netskope plugin if the push() method exceeds 8MB limit of the product. |
self.proxy | requests.get(url=url, proxies=self.proxy) | Handle of system’s proxy settings if configured, else {}. |
self.ssl_validation | requests.get(url=url, verify=self.ssl_validation) | Boolean value which mentions if ssl validation be enforced for REST API calls. |
Plugin class has to be inherited from the PluginBase class. PluginBase class is defined in netskope.integrations.cte.plugin_base.
Make sure Plugin class provides implementation for the pull, push and validate method.
Plugin class will contain all the necessary params to establish connection and authentication with the 3rd-party API.
Constants like PLUGIN_NAME, LIMIT, etc. should be declared.
"""Sample plugin implementation. This is a sample implementation of base PluginBase class. Which explains the concrete implemetation of the base class. """ from netskope.integrations.cte.plugin_base import PluginBase, ValidationResult, PushResult from netskope.integrations.cte.models import Indicator, IndicatorType from typing import List from datetime import datetime import requests PLUGIN_NAME = "<module> <plugin_name> Plugin class SamplePlugin(PluginBase): """SamplePlugin class having concrete implementation for pulling and pushing threat information. This class is responsible for implementing pull, push and validate methods with proper return types, so that it's lifecycle execution can be scheduled by the CTE core engine. """
This is an abstract method of PluginBase Class.
This method implements the logic to pull the Threat IOCs (Malware & Malsites) from the API endpoints. This method is invoked periodically.
Make sure its unit-testable.
Use the checkpoint passed by the Cloud Exchange platform by invoking self.last_run_at It returns the datetime.datetime Python object containing timestamp when this method was last executed successfully.
Use the proxy configuration passed by the Cloud Exchange platform by invoking self.proxy. It returns the python dict object which can be used directly with requests module.
All the configuration parameters for API authentication are passed as python dict receives them by invoking self.configuration.
All the logs be logged by the self.logger object with proper log level (info, warn, error). This object logs the logs to MongoDB and can be accessed via API calls.
Use self.ssl_validation bool to enable/disable validation of the SSL server certificate.
Return the list of Indicator objects (Refer to Indicator Data Model) which contain the data received from the API endpoint.
In the case of failure raise an error or exception of appropriate type with proper message.
def pull(self): """Pull the Threat information from the 3rd part Threat Intel systems. Implement the logic of pulling Threat data from 3rd party apis and return the list of objects netskope.integrations.cte.models.Indicators on successful pull otherwise raises an exception. Returns: List[netskope.integrations.cte.models.Indicators]: List of indicator objects received from the 3rd party Threat Intel Systems. """ # Load all the configured plugin parameters as python dict object. # Use the key name provided in the manifest.json file for the configuration parameters to # get the value of that particular parameter. config = self.configuration # get proxy settings dict, just the way requests module requires. proxy_dict = self.proxy # get the ssl_validation bool for enabling/disabling validation of SSL server certificates. ssl_validation = self.ssl_validation start_time = self.last_run_at # datetime.datetime object. # How to use proxy dict and ssl_validation flag. resp = requests.get("www.example.com", proxies=proxy_dict, verify=ssl_validation) # noqa: F841 # Get the logger object for logging purpose. This logger object logs all the logs to mongodb # under the cte database logs collection. Log timestamp is automatically recorded by the logger library. # Supported logging levels are info, warn and error. logger = self.logger logger.info(f"{PLUGIN_NAME}: Starting Pulling data for sample plugin.") indicator_list = self.pull_data_from_3rd_party_api(config, logger) logger.info(f"{PLUGIN_NAME}:logger.info("{PLUGIN_NAME}: Finished pulling data") return indicator_list
This is an abstract method of PluginBase Class.
This method implements the logic to push the Threat IOC information shared by the Cloud Exchange platform to the product API endpoints.
It receives all the parameters that the Pull method receives in addition to that it receives the List of Indicator objects from the Cloud Exchange platform as method argument which are to be shared with the integrating product.
This method will be invoked when the Cloud Exchange platform receives a new indicator from a source and sharing of indicators is configured with the current plugin configuration.
If the API supports the PATCH method to share the indicators then this method will receive only one indicator object in the list otherwise it will receive all the indicator objects which are returned after applying the sharing filters on all the indicators in the Threat Exchange platform database.
Make sure to handle the case when the maximum payload size supported by API endpoint is exceeded. There can be multiple ways to handle this case.
If the API endpoint supports multiple requests with a fixed payload size, send the data in chunks.
If the API endpoint does not support multiple requests (i.e. we can push it in one API call only) either plugin can skip the remaining indicators and raise a notification to the user to adjust the sharing filters or it can fail with the error of exceeded payload size.
Return the PushResult object (Refer to PushResult Class with a success flag indicating whether the Push operation was successful or not.
Handle all the Exceptions with connection and HTTP response code.
def push(self, indicators: List[Indicator]): """Push the Indicator list to the 3rd party Threat Intel systems. Implement the logic of spliting the indicators list according to their type and push the data to the 3rd party APIs. This method will be invoked while sharing the Threat information with 3rd party. Args: indicators (List[netskope.integrations.cte.models.Indicators]): List of Indicator objects to be pushed. Returns: netskope.integrations.cte.plugin_base.PushResult: PushResult object with success flag and Push result message. """ # Load all the configured plugin parameters as python dict object. # Use the key name provided in the manifest.json file for the configuration parameters to # get the value of that particular parameter. config = self.configuration # get proxy settings dict, just the way requests module requires. proxy_dict = self.proxy # get the ssl_validation bool for enabling/disabling validation of SSL server certificates. ssl_validation = self.ssl_validation # How to use proxy dict and ssl_validation flag. resp = requests.get("www.example.com", proxies=proxy_dict, verify=ssl_validation) # noqa: F841 # Get the logger object for logging purpose. This logger object logs all the logs to mongodb # under the cte database logs collection. Log timestamp is automatically recorded by the logger library. # Supported logging levels are info, warn and error. logger = self.logger logger.info(f"{PLUGIN_NAME}: Starting Pulling data for sample plugin.") push_result = self.push_data_to_3rd_party_api(config, logger, indicators) logger.info("f"{PLUGIN_NAME}: Finished Pushing data for sample plugin.") return push_result
This is an abstract method of PluginBase Class.
This method validates the plugin configuration and authentication parameters passed while creating a plugin configuration.
This method will be called only when a new configuration is created or updated.
Validate against all the mandatory parameters are passed with the proper datatype.
Validate the authentication parameters and the API endpoint to ensure the smooth execution of the plugin lifecycle.
Return the object of ValidationResult (Refer to ValidationResult Class) with a success flag indicating validation success or failure and the validation message containing validation failure reason.
def validate(self, data): """Validate the Plugin configuration parameters. Validation for all the parameters mentioned in the manifest.json for the existence and data type. Method returns the netskope.integrations.cte.plugin_base.ValidationResult object with success = True in the case of successful validation and success = False and a error message in the case of failure. Args: data (dict): Dict object having all the Plugin configuration parameters. Returns: netskope.integrations.cte.plugin_base.ValidateResult: ValidateResult object with success flag and message. """ self.logger.info(f"{PLUGIN_NAME}: Executing validate method for Sample plugin") if ( "secret_field_id1" not in data or not data["secret_field_id1"] or type(data["secret_field_id1"]) != str ): self.logger.error( f"{PLUGIN_NAME}: Validation error occurred Error: Secret Field1 is required with type string." ) return ValidationResult( success=False, message="Invalid Secret Field 1 provided.", ) else: return ValidationResult( success=True, message="Validation Successful for Sample plugin" )
This is an abstract method of PluginBase Class.
This method should return a list of all the supported actions (displayed as Targets in the UI) if the plugin supports sharing of indicators (i.e. manifest has push_supported=true) otherwise it should return an empty list.
Add all the supported actions in the ActionWithoutParams class and return a list of objects of ActionWithoutParams class.
If the plugin supports sharing of indicators then this method should return at least one action.
If manifest has push_supported=false:
def get_actions(self): """Get available actions. Returns: List[ActionWithoutParams]: List of ActionWithoutParams objects that are supported by the plugin. """ return []
If manifest has push_supported=true:
def get_actions(self): """Get available actions. Returns: List[ActionWithoutParams]: List of ActionWithoutParams objects that are supported by the plugin. """ return [ ActionWithoutParams(label=”Share Indicators”, value=”share”) ActionWithoutParams(label=”Add to Group”, value=”add”) ]
This is an abstract method of PluginBase Class.
This method should return the list of fields to be rendered in the UI when a target is selected from dropdown.
This method should be called after the user selects any of the actions.
If the selected action requires any parameters then return a list of dictionaries (where each dictionary is a configurable input) otherwise return an empty list.
Go to Manifest.json to see how fields are defined.
If manifest has push_supported=false:
def get_action_fields(self, action: Action): """Get fields required for an action. Args: action (Action): Action object which is selected as Target. Return: List[Dict]: List of configurable fields based on selected action. """ return []
If manifest has push_supported=true:
def get_action_fields(self, action: Action): """Get fields required for an action. Args: action (Action): Action object which is selected as Target. Return: List[Dict]: List of configurable fields based on selected action. """ if action.value == “add”: return [ { “label”: “Group Name”, “key”: “group_name”, “type”: “text”, “default”: “”, “mandatory”: True, “description”: “Name of group.” } ] else: return []
This is an abstract method of PluginBase Class.
This method validates the action and their parameters.
This method will be called only when the new sharing configuration is created or existing sharing configuration is updated.
Validate against all the mandatory parameters are passed with the proper datatype.
Return the object of ValidationResult (Refer to ValidationResult Class) with a success flag indicating validation success or failure and the validation message containing validation failure reason.
If the plugin is not push supported then return ValidationResult object with a success flag otherwise check for validations.
If manifest has push_supported=false:
def validate_action(self, action: Action): """Validate Action Parameters. Args: action (Action): Action object having all the configurable parameters. Return: netskope.integrations.cte.plugin_base.ValidateResult: ValidateResult object with success flag and message. """ return ValidationResult(success=True, message=”Validation successful.”)
If manifest has push_supported=true:
def validate_action(self, action: Action): """Validate Action Parameters. Args: action (Action): Action object having all the configurable parameters. Return: netskope.integrations.cte.plugin_base.ValidateResult: ValidateResult object with success flag and message. """ if action.value not in [“share”, “add”]: return ValidationResult( success=False, message=”Unsupported action provided.” ) if action.value == “add”: if action.parameters.get(“group_name”) is None: return ValidationResult( success=False, message=”Group Name should not be empty.” ) return ValidationResult( success=True, message=”Validation successful.” )
This section lists down the Data Models and their properties.
This class provides the Python data model for an IndicatorType object.
This model has 3 fields: URL, SHA256, MD5 of type string.
You will interact with the model in the pushmethod as you sending a list of Indicators to thirdparty.
Data Model Properties
Name | Type | Description |
---|---|---|
URL | string | It can be used for URLs |
SHA256 | string | It can be used for sha256 typ of file hashes |
MD5 | string | It can be used for md5 type of file hashes |
from netskope.integrations.cte.models import ( Indicator, IndicatorType, SeverityType, ) Indicator( value=behavior_info.get("ioc_value"), type=IndicatorType.SHA256, comments=behavior_info.get("ioc_description", ""), firstSeen=datetime.datetime.strptime( behavior_info.get("timestamp"), "%Y-%m-%dT%H:%M:%SZ", ), lastSeen=datetime.datetime.strptime( behavior_info.get("timestamp"), "%Y-%m-%dT%H:%M:%SZ", ), severity=self.get_severity_from_int(behavior_info.get("severity", 0)), )
This class provides the Python data model for an SeverityType object.
This model has 5 fields: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL of type string.
You will interact with the model in the pull method as you return a list of Indicators.
Data Model Properties
Name | Type | Description |
---|---|---|
UNKNOWN | string | It can be used for unknown severity |
LOW | string | It can be used for low severity |
MEDIUM | string | It can be used for medium severity |
HIGH | string | It can be used for high severity |
CRITICAL | string | It can be used for critical severity |
from netskope.integrations.cte.models import SeverityType if type(severity) is not int or severity == 0: return SeverityType.UNKNOWN if 10 <= severity <= 39: return SeverityType.LOW if 40 <= severity <= 69: return SeverityType.MEDIUM if 70 <= severity <= 89: return SeverityType.HIGH if 90 <= severity <= 100: return SeverityType.CRITICAL return SeverityType.UNKNOWN
This class provides the Python data model for an ActionWithoutParams object.
This model has 2 fields: label and value of type string.
You will interact with the model in the get_actions method as you return a list of available actions.
Data Model Properties
Name | Type | Description |
---|---|---|
label | string | Label displayed on UI |
value | string | Value of the field |
from netskope.integrations.cte.models.business_rule import ActionWithoutParams ActionWithoutParams( label="Add to Suspicious Object List", value="suspicious_object", )
This class provides the Python data model for an Indicator object.
Pull method returns the list of objects of Indicator class with the information receivedfrom API calls.
Supported values for type field is IndicatorType.MD5, IndicatorType.SHA256 and IndicatorType.URL. MD5 and SHA256 types are used to represent malware indicators and URL type is used to represent malsite indicators.
The value of the reputation field has to be in the range of 1-10. The default value of thereputation fields is 5, if not supplied. If it is supplied (within that range) CTE will accept and display it.
Data Model Properties
Name | Type | Description |
---|---|---|
value | string | Indicator value. It can be MD5/SHA256 hash value in the case of Malware indicators, and domain-name/url in case of Malsite indicators. |
type | Enum (IndicatorType) | It can be IndicatorType.MD5 or IndicatorType.SHA256 in the case of Malware indicators, IndicatorType.URL in case of Malsite indicators. |
test | bool | Indicates whether it's a test indicator or not. Default - False |
reputation | int | Reputation score of indicator. Default - 5 |
expiresAt | datetime.datetime | Time after which Indicator will be marked as inactive by Cloud Exchange. |
firstSeen | datetime.datetime | datetime.datetime object indicating when the indicator was discovered for the first time. Default - Current System time when Indicator object was created. |
lastSeen | datetime.datetime | datetime.datetime object indicating when the indicator was discovered for last time. Default - Current System time when Indicator object was created. |
comments | string | Comment string which gives more information about the indicator. |
severity | Enum (SeverityType) | Severity of the indicator. Possible values are: SeverityType.LOW SeverityType.MEDIUM SeverityType.HIGH SeverityType.CRITICAL SeverityType.UNKNOWN |
extendedInformation | string | A link leading to an external source for extended information regarding the indicator. The value in this field will be rendered as a clickable URL in the UI. URL scheme must be either HTTP or HTTPS. |
from netskope.integrations.cte.models import Indicator, IndicatorType Indicator( value="md5hash", # md5 hash value of the indicator. type=IndicatorType.MD5, # Type of indicator. test=True, # Indicates whether it's test indicator or not. Defaults to False. reputation=7, # Reputation score of Indicator. Defaults to 5. # Time after which Indicator will be marked as inactive. expiresAt=datetime.datetime(2022, 12, 23, 15, 22, 52, 667126), # Time when the Indicator was discovered first time. Defaults to current time. firstSeen=datetime.datetime(2017, 1, 11, 15, 22, 52, 667126), # Time when the Indicator was discovered last time. Defaults to current time. lastSeen=datetime.datetime(2019, 12, 23, 15, 22, 52, 667126), # Comment which gives more information about the indicator. comments="Indicator explanation", ),
This class contains the result of Push operation of indicators to the product API endpoint.
The success flag indicates the push operation result and the message flag should have proper error message in the case of failure. (In a case of success a simple success message with success flag True should be returned)
Data Model Properties
Name | Type | Description |
---|---|---|
success | bool | success flag indicates the result of push operation, whether it succeeded or failed. |
message | string | Message field denotes the error in case of failure. In case of success it can be a simple success message. |
from netskope.integrations.cte.plugin_base import PushResult PushResult( success=True, message=" Successfully pushed data to 3rd party." )
This class contains the result of the validation process on the plugin configuration parameters passed to the Plugin object while creating a new configuration for the plugin.
Make sure that all the parameters passed to the validate method are validated against the data-type and value.
Validate method returns the object of this class with a success flag indicating the result of validation operation and message field contains the proper error message in the case of validation failure. (In a case of success a simple success message with success flag True has to be returned)
Data Model Properties
Name | Type | Description |
---|---|---|
success | bool | success flag indicates the result of validation operation, whether it succeeded or failed. |
message | string | Message field denotes the error in case of failure. In case of success it can be a simple success message. |
from netskope.integrations.cte.plugin_base import ValidationResult ValidationResult( success=True, message="Validation Successfull for Sample plugin" )
Cloud Exchange provides a handle of logger object for logging.
Avoid print statements in the code.
This object logs to the central Cloud Exchange database with the timestamp field. Supported log levels are
info
,warn
anderror
.Make sure any API authentication secret or any sensitive information is not exposed in the log messages.
Make sure to implement a proper logging mechanism with the logger object passed by the CE platform.
Make sure enough logging is done, which helps the Ops team in troubleshooting.
Make sure any sensitive data is not logged or leaked in the notification or logs.
self.logger.error( f"{PLUGIN_NAME}: Error log-message goes here." ) self.logger.warn( f"{PLUGIN_NAME}: Warning log-message goes here." ) self.logger.info( f"{PLUGIN_NAME}: Info log-message goes here." )
Notifications
Cloud Exchange provides a handle of the notification object, which can be used to generate notifications on Cloud Exchange UI.
This object is passed from Cloud Exchange to the plugin object. Every plugin integration can use this object whenever there is a case of failure, which has to be notified to the user immediately. The notification timestamp is managed by Cloud Exchange.
This object will raise the notification in the UI with a color-coding for the severity of the failure. Supported notification severities are
info
,warn
, anderror
.Make sure any API authentication secret or any sensitive information is not exposed in the notification messages.
Use a notifier object to raise the notification for failures or critical situations (like rate-limiting, or exceeding payload size) to notify the status of the plugin to the user.
self.notifier.info( f"{PLUGIN_NAME}: Info notification-message goes here." ) self.notifier.error( f"{PLUGIN_NAME}: Error notification-message goes here." ) self.notifier.warn( f"{PLUGIN_NAME}: Warning notification-message goes here." )
Tagging
Threat Exchange provides a utility class to handle tagging related functionalities from the plugin push/pull/validate methods. Below are some of the examples of what it can be used for.
from netskope.integrations.cte.models import TagIn from netskope.integrations.cte.utils import TagUtils utils = TagUtils() utils.create_tag(Tag(name="Tag Name", color="#FF0000"))
utils = TagUtils() if utils.exists("Tag Name"): pass # tag already exists
utils = TagUtils() utils.on_indicators({ "source": "Test" }).remove("Tag Name")
This removes the Tag Name tag from all the indicators with source=”Test”. The first and only argument to `on_indicators` is a `dict` object. This has to be a valid mongo query.
utils = TagUtils() utils.on_indicators({ "source": "Test" }).add("Tag Name")
Testing
As part of the build process, we run a few linters to catch common programming errors, stylistic errors, and possible security issues.
This is a basic linter. It can be run without having all the dependencies available and will catch common errors. We also use this linter to enforce the standard python pep8 formatting style. On rare occasions, you may encounter a need to disable an error/warning returned from this linter. Do this by adding an inline comment of the sort on the line you want to disable the error:
# # noqa: <error-id>
For example:
example = lambda: 'example' # example = lambda: 'example' # noqa: E731
When adding an inline comment always also include the error code you are disabling for. That way if there are other errors on the same line they will be reported.
Refer to: https://flake8.pycqa.org/en/latest/user/violations.html#in-line-ignoring-errors
PEP8 style docstring check is also enabled with the flake8 linter. So make sure every function/module has a proper docstring added.
Unit Testing
Ensure unit testing to test small units of code in an isolated and deterministic fashion. Make sure that unit tests avoid performing communication with external APIs and use mocking. Make sure unit tests ensure the code coverage is more than 70%.
In order to work with unit testing, the integration or automation script needs to be developed following the Plugin Directory Structure. We use PIP to install all the required Python module dependencies required to run the setup. Before running the tests, make sure you install all the required dependencies mentioned in the requirements.txt
of the Cloud Exchange core repository.
Make sure unit tests are written in a separate Python file named: <your_plugin_name>_test.py. Within the unit test file, each unit test function should be named: test_<your test case>. More information on writing unit tests and their format is available at the PyTest Docs.
We use pytest-mock for mocking. pytest-mock is enabled by default and installed in the base environment mentioned above. To use a mocker object simply pass it as a parameter to your test function. The mocker can then be used to mock both the plugin class object and also external APIs.
Example
def test_netskope_pull_success(mocker): mocker.patch("cte.plugins.netskope.main.NetskopePlugin.pull") pull_return_result = [ Indicator(value="ind1", type=IndicatorType.MD5), Indicator(value="ind2", type=IndicatorType.SHA256), Indicator(value="ind3", type=IndicatorType.URL), ] NetskopePlugin.pull.return_value = pull_return_result ns = NetskopePlugin(None, None, None, logger) actual_pull = ns.pull() assert pull_return_result == actual_pull
To mock request module response for API calls (requests_mock Pytest plugin is installed with all the dependencies):
def test_fetch_threat_data_malware(requests_mock): endpoint_url = "https://example-api.com" mock_response_json = { 'status': 'success', 'data': [ { 'local_md5': 'ind1', }, { 'local_md5': 'ind2', }, { 'local_md5': 'ind3', }, ] } requests_mock.get(endpoint_url, json=mock_response_json) config_dict = { "api_token": "abc", "tenant_name": "partners", "file_list": "test", "url_list": "sample", "threat_data_type": "URL", "is_pull_required": "Yes", "max_file_hash_cap": 8, "max_url_list_cap": 8, } ns = NetskopePlugin(config_dict, None, None, logger) actual_ind_list = ns.fetch_threat_data( endpoint_url, config_dict['api_token'], datetime.datetime.now(), time.time(), "malware" ) indicator_list = [ Indicator(value="ind1", type=IndicatorType.MD5), Indicator(value="ind2", type=IndicatorType.MD5), Indicator(value="ind3", type=IndicatorType.MD5), ] assert len(actual_ind_list) == len(indicator_list) for i in range(len(actual_ind_list)): assert actual_ind_list[i].value == indicator_list[i].value
$ PYTHONPATH=. pytest
Cloud Exchange expects the developed plugin to be in zip or tar.gz format.
Execute this command to zip the package:
zip -r sample_plugin.zip sample_plugin
Execute this command to generate tar.gz package:
tar -zcvf sample_plugin.tar.gz sample_plugin
Use the zip or tar.gz file to deploy the plugin
In Cloud Exchange, go to Setting > Plugins.
Click Add New Plugin.
Click Browse.
Select the zip or tar.gz file
Click Upload.
Note
This plugin is supported for the Threat Exchange module only.
Add a Repository
To deploy your plugin on the Cloud Exchange platform, you can add a repository to store your plugin
In Cloud Exchange, go to Setting > Plugin Repository.
Click Configure New Repository.
Enter a Repository name, Repository URL, Username, and Personal Access Token.
Click Save.
Go to Settings > Plugins.
Select the Repository name from the Repository dropdown.