#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: Erdog, Loveforkeeps, Alluka
import base64
import json
import logging
import requests
import os
import sys
from retry.api import retry_call
if __name__ == '__main__':
# 当作为脚本直接执行时的处理方式
from exception import FofaError
from helper import get_language, encode_query
else:
# 当作为包/模块导入时的处理方式
from .exception import FofaError
from .helper import get_language, encode_query
[docs]class Client:
"""
A class representing the FOFA client.
:param key: The Fofa api key. If not specified, it will be read from the FOFA_KEY environment variable.
:type key: str
:param base_url: The base URL of the FOFA API. Defaults to 'https://fofa.info'.
:type base_url: str
:param proxies: A proxies array for the requests library, e.g. {'https': 'your proxy'}
:type proxies: dict
"""
def __init__(self, key='', base_url='', proxies=None):
""" Initialize the FOFA client.
"""
key = os.environ.get('FOFA_KEY', '')
if base_url == '':
base_url = os.environ.get('FOFA_BASE_URL', 'https://fofa.info')
self.key = key
self.base_url = base_url.rstrip('/')
self.lang = 'en'
sys_lang = get_language()
if sys_lang != None and sys_lang.startswith('zh'):
self.lang = 'zh-CN'
self._session = requests.Session()
if proxies:
self._session.proxies.update(proxies)
self._session.trust_env = False
# retry config, in seconds
self.tries = 5 # Number of retry attempts
self.delay = 1 # Initial delay between retries
self.max_delay = 60 # Maximum delay between retries
self.backoff = 2 # Backoff factor for exponential backoff
[docs] def get_userinfo(self):
"""
Get user info for current user.
:return: User information in JSON format.
:rtype: dict
:raises FofaException: If an error occurs during the API request.
:Example:
The returned JSON result will be in the following format:
.. code-block:: json
{
"username": "sample",
"fofacli_ver": "4.0.3",
"fcoin": 0,
"error": false,
"fofa_server": true,
"avatar": "https://nosec.org/missing.jpg",
"vip_level": 0,
"is_verified": false,
"message": "",
"isvip": false,
"email": "username@sample.net"
}
"""
return self.__do_req( "/api/v1/info/my")
[docs] def search(self, query_str, page=1, size=100, fields="", opts={}):
"""
Search data in FOFA.
:param query_str: The search query string.
Example 1:
'ip=127.0.0.1'
Example 2:
'header="thinkphp" || header="think_template"'
:type query_str: str
:param page: Page number. Default is 1.
:type page: int
:param size: Number of results to be returned in one page. Default is 100.
:type size: int
:param fields: Comma-separated list of fields to be included in the query result.
Example:
'ip,city'
:type fields: str
:param opts: Additional options for the query. This should be a dictionary of key-value pairs.
:type opts: dict
:return: Query result in JSON format.
:rtype: dict
.. code-block:: json
{
"results": [
[
"111.**.241.**:8111",
"111.**.241.**",
"8111"
],
[
"210.**.181.**",
"210.**.181.**",
"80"
]
],
"mode": "extended",
"error": false,
"query": "app=\\"网宿科技-公司产品\\"",
"page": 1,
"size": 2
}
"""
param = opts
param['qbase64'] = encode_query(query_str)
param['page'] = page
param['fields'] = fields
param['size'] = size
logging.debug("search '%s' page:%d size:%d", query_str, page, size)
return self.__do_req('/api/v1/search/all', param)
[docs] def can_use_next(self):
"""
Check if the "search_next" API can be used.
:return: True if the "search_next" API can be used, False otherwise.
:rtype: bool
"""
try:
self.search_next('bad=query', size=1)
except FofaError as e:
if e.code == 820000:
return True
return False
[docs] def search_next(self, query_str, fields='', size=100, next='', full=False, opts={}):
"""
Query the next page of search results.
:param query_str: The search query string.
Example 1:
'ip=127.0.0.1'
Example 2:
'header="thinkphp" || header="think_template"'
:param fields: The fields to be included in the response.
Default: 'host,ip,port'
:type fields: str
:param size: The number of results to be returned per page.
Default: 100
Maximum: 10,000
:type size: int
:param next: The ID for pagination.
The next value is returned in the response of previous search query.
If not provided, the first page of results will be returned.
:type next: str
:param full: Specify if all data should be searched.
Default: False (search within the past year)
Set to True to search all data.
:type full: bool
:param opts: Additional options for the search.
:type opts: dict
:return: The query result in JSON format.
:rtype: dict
"""
param = opts
param['qbase64'] = encode_query(query_str)
param['fields'] = fields
param['size'] = size
param['full'] = full
if next and next != '':
param['next'] = next
logging.debug("search next for '%s' size:%d, next:%s", query_str, size, next)
return self.__do_req('/api/v1/search/next', param)
[docs] def search_stats(self, query_str, size=5, fields='', opts={}):
"""
Query the statistics of the search results.
:param query_str: The search query string.
Example 1:
'ip=127.0.0.1'
Example 2:
'header="thinkphp" || header="think_template"'
:type query_str: str
:param size: The number of results to be aggregated for each item.
Default: 5
:type size: int
:param fields: The fields to be included in the aggregation.
Example: 'ip,city'
:type fields: str
:param opts: Additional options for the search.
:type opts: dict
:return: query result in json format
.. code-block:: json
{
"distinct": {
"ip": 1717,
"title": 411
},
"lastupdatetime": "2022-06-17 13:00:00",
"aggs": {
"title": [
{
"count": 35,
"name": "百度一下,你就知道"
},
{
"count": 25,
"name": "百度网盘-免费云盘丨文件共享软件丨超大容量丨存储安全"
},
{
"count": 16,
"name": "百度智能云-登录"
},
{
"count": 2,
"name": "百度翻译开放平台"
}
],
"countries": []
},
"error": false
}
"""
param = opts
param['qbase64'] = encode_query(query_str)
param['fields'] = fields
param['size'] = size
return self.__do_req('/api/v1/search/stats', param)
[docs] def search_host(self, host, detail=False, opts={}):
"""
Search for host information based on the specified IP address or domain.
:param host: The IP address or domain of the host to search for.
:type host: str
:param detail: Optional. Specifies whether to show detailed information. Default is False.
:type detail: bool
:param opts: Optional. Additional options for the search. Default is an empty dictionary.
:type opts: dict
:return: The query result in JSON format.
:rtype: dict
.. code-block:: json
{
"error": false,
"host": "78.48.50.249",
"ip": "78.48.50.249",
"asn": 6805,
"org": "Telefonica Germany",
"country_name": "Germany",
"country_code": "DE",
"protocol": [
"http",
"https"
],
"port": [
80,
443
],
"category": [
"CMS"
],
"product": [
"Synology-WebStation"
],
"update_time": "2022-06-11 08:00:00"
}
"""
param = opts
param['detail'] = detail
u = '/api/v1/host/%s' % host
return self.__do_req(u, param)
def __do_req(self, path, params=None, method='get'):
u = self.base_url + path
data = None
req_param = {}
if not self.key or self.key == '':
raise FofaError("Empty fofa api key")
if params == None:
req_param = {
"key": self.key,
"lang": self.lang,
}
else:
req_param = params
req_param['key'] = self.key
req_param['lang'] = self.lang
if method == 'post':
data = params
params = None
def make_request():
headers = {"Accept-Encoding": "gzip"}
response = self._session.request(url=u, method=method, data=data, params=req_param, headers=headers)
if response.status_code != 200:
raise Exception("Request failed with status code: {}".format(response.status_code))
return response
res = retry_call(make_request,
tries = self.tries,
delay = self.delay,
max_delay = self.max_delay,
backoff=self.backoff)
data = res.json()
if 'error' in data and data['error']:
raise FofaError(data['errmsg'])
return data
if __name__ == "__main__":
client = Client()
logging.basicConfig(level=logging.DEBUG)
print(client.can_use_next())
print(json.dumps(client.get_userinfo(), ensure_ascii=False))
print(json.dumps(client.search('app="网宿科技-公司产品"', page=1), ensure_ascii=False))
print(json.dumps(client.search_host('78.48.50.249', detail=True), ensure_ascii=False))
print(json.dumps(client.search_stats('domain="baidu.com"', fields='title'), ensure_ascii=False))