请注意,本文编写于 140 天前,最后修改于 108 天前,其中某些信息可能已经过时。
一、Python脚本:
任意一台有python环境并可以访问zabbix的主机即可。
import requests
import json
import requests
import json
# Zabbix API 配置
zabbix_url = "http://your_zabbix_server/api_jsonrpc.php" #替换your_zabbix_server为你的zabbix服务地址
zabbix_user = "your_username" #替换your_username为你的用户名
zabbix_password = "your_password" #替换your_password为你的用户密码
group_id = "your_group_id" #替换your_group_id为你的zabbix主机群组id
# Function to read IP addresses from file
def read_ip_list(file_path):
with open(file_path, 'r') as file:
return [line.strip() for line in file.readlines()]
# Login and get authentication token
def get_auth_token():
payload = {
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"user": zabbix_user,
"password": zabbix_password
},
"id": 1,
"auth": None
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result:
return result['result']
else:
print("Failed to get auth token:", result)
raise Exception("Failed to authenticate")
# Check if host exists
def host_exists(auth_token, ip):
payload = {
"jsonrpc": "2.0",
"method": "host.get",
"params": {
"filter": {
"host": [ip]
}
},
"auth": auth_token,
"id": 1
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result and len(result['result']) > 0:
return result['result'][0]['hostid']
return None
# Create or update host
def create_or_update_host(auth_token, ip, group_id):
host_name = f"Host_{ip.replace('.', '_')}" # 创建唯一的主机名
payload = {
"jsonrpc": "2.0",
"method": "host.create",
"params": {
"host": host_name,
"interfaces": [
{
"type": 1,
"main": 1,
"useip": 1,
"ip": ip,
"dns": "",
"port": "10050"
}
],
"groups": [
{
"groupid": group_id
}
],
"name": ip
},
"auth": auth_token,
"id": 1
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result:
host_id = result['result']['hostids'][0]
return host_id, host_name
else:
print(f"Failed to create/update host {ip}: {result}")
raise Exception("Failed to create/update host")
def get_host_info(auth_token, host_id):
payload = {
"jsonrpc": "2.0",
"method": "host.get",
"params": {
"hostids": host_id,
"output": ["host"]
},
"auth": auth_token,
"id": 1
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result and result['result']:
return result['result'][0]
else:
raise Exception("Failed to get host info")
# Get interface ID for the host
def get_interface_id(auth_token, host_id):
payload = {
"jsonrpc": "2.0",
"method": "hostinterface.get",
"params": {
"output": "extend",
"hostids": host_id
},
"auth": auth_token,
"id": 1
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result and len(result['result']) > 0:
return result['result'][0]['interfaceid']
else:
print(f"Failed to get interface ID for host ID {host_id}: {result}")
raise Exception("Failed to get interface ID")
# Create ICMP ping item
def create_icmp_item(auth_token, host_id, interface_id):
# First, check if the item already exists
payload = {
"jsonrpc": "2.0",
"method": "item.get",
"params": {
"output": "extend",
"hostids": host_id,
"search": {
"key_": "icmpping"
}
},
"auth": auth_token,
"id": 1
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result and result['result']:
print(f"ICMP ping item already exists for host ID {host_id}")
return result['result'][0]['itemid']
# If item doesn't exist, create it
payload = {
"jsonrpc": "2.0",
"method": "item.create",
"params": {
"name": "ICMP ping",
"key_": "icmpping",
"hostid": host_id,
"type": 3, # Simple check
"value_type": 3,
"interfaceid": interface_id,
"delay": "60s"
},
"auth": auth_token,
"id": 1
}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result:
return result['result']['itemids'][0]
else:
print(f"Failed to create ICMP item for host ID {host_id}: {result}")
raise Exception("Failed to create ICMP item")
# Create ICMP ping trigger
def create_icmp_trigger(auth_token, host_name):
payload = {
"jsonrpc": "2.0",
"method": "trigger.create",
"params": [{
"description": "ICMP ping down",
"expression": f"last(/{host_name}/icmpping)=0",
"priority": 4,
"status": 0
}],
"auth": auth_token,
"id": 1
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result:
return result['result']['triggerids'][0]
else:
print(f"Failed to create trigger for host {host_name}: {result}")
raise Exception("Failed to create trigger")
# Check if trigger exists
def get_existing_trigger(auth_token, host_name):
payload = {
"jsonrpc": "2.0",
"method": "trigger.get",
"params": {
"output": ["triggerid"],
"host": host_name,
"search": {
"description": "ICMP ping down"
}
},
"auth": auth_token,
"id": 1
}
headers = {'content-type': 'application/json'}
response = requests.post(zabbix_url, data=json.dumps(payload), headers=headers)
result = response.json()
if 'result' in result and result['result']:
return result['result'][0]['triggerid']
return None
# Main function
def main():
auth_token = get_auth_token()
ip_list = read_ip_list('ip_list.txt')
for ip in ip_list:
try:
host_id, host_name = create_or_update_host(auth_token, ip, group_id)
print(f"Created/Updated host: {host_name} (ID: {host_id})")
interface_id = get_interface_id(auth_token, host_id)
item_id = create_icmp_item(auth_token, host_id, interface_id)
existing_trigger = get_existing_trigger(auth_token, host_name)
if existing_trigger:
print(f"Trigger for host {host_name} already exists with ID: {existing_trigger}")
else:
try:
trigger_id = create_icmp_trigger(auth_token, host_name)
print(f"Created trigger for host {host_name}. Trigger ID: {trigger_id}")
except Exception as trigger_error:
print(f"Error creating trigger for host {host_name}: {trigger_error}")
except Exception as e:
print(f"Error processing IP {ip}: {e}")
if __name__ == "__main__":
main()
主机群组id,可以按图示查看:
点到具体群组后,可以在网址栏查看:
二、使用脚本:
- 将脚本中的
zabbix_url
,zabbix_user
,zabbix_password
,group_id
替换为您的实际值。 - 创建一个名为
ip_list.txt
的文件,每行包含一个要监控的IP地址。 - 运行脚本:
python3 zabbix_add_hosts.py
附件:
更改后缀为.py即可使用