#!/usr/bin/lua
-- socket and related libraries for network communication
local socket = require("socket")
local http = require("socket.http")
local ltn12 = require("ltn12")
-- SSDP multicast address and port
local SSDP_ADDR = '239.255.255.250'
local SSDP_PORT = 1900
-- M-SEARCH request
local msearch_request = table.concat({
'M-SEARCH * HTTP/1.1',
'HOST: ' .. SSDP_ADDR .. ':' .. SSDP_PORT,
'MAN: "ssdp:discover"',
'ST: urn:schemas-upnp-org:service:WANIPConnection:1',
'MX: 3',
'',
''
}, '\r\n')
-- LOCATION ヘッダからbase URL を抽出する
local function extract_base_url(response_text)
local _, _, location_url = string.find(response_text, "LOCATION:%s*(.+)/gatedesc.xml")
return location_url
end
-- UPnP データを取り出す
local function get_upnp_data(base_url, req_info)
-- データ取得のためのURLを生成
local url = base_url .. "/upnp/control/WANCommonIFC1"
local soap_data = string.format([[
]], req_info.action, req_info.action)
local headers = {
["Content-Type"] = "text/xml; charset=\"utf-8\"",
["SOAPAction"] = '"urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1#' .. req_info.action .. '"'
}
-- print("----------------------------------------")
-- print(req_info.label .. "データを取得中...")
local response_body = {}
local res, status_code = http.request{
url = url,
method = "POST",
headers = headers,
source = ltn12.source.string(soap_data),
sink = ltn12.sink.table(response_body),
timeout = 10
}
if status_code and (status_code == 200 or status_code == "200 OK") then
local response_text = table.concat(response_body)
local start_tag = req_info.tag
local end_tag = string.gsub(start_tag, "<", "")
local _, _, bytes_string = string.find(response_text, start_tag .. "(.*)" .. end_tag)
if bytes_string then
local total_bytes = tonumber(bytes_string)
local megabytes = total_bytes / 1024 / 1024
print(string.format("合計%sバイト数: %d", req_info.label, total_bytes))
-- print(string.format("合計%sメガバイト数: %.2f MB", req_info.label, megabytes))
else
print(req_info.label .. "データが見つかりませんでした。")
print("応答内容:\n" .. response_text)
end
else
print(req_info.label .. "データの取得に失敗しました。")
print("ステータスコード: " .. (status_code or "不明"))
print("応答内容:\n" .. table.concat(response_body))
end
end
-- Main logic
local base_url = nil
-- Create a UDP socket for M-SEARCH
local udp = socket.udp()
udp:setoption('broadcast', true)
udp:setoption('ip-multicast-loop', true)
udp:settimeout(5)
-- print("M-SEARCHリクエストを送信中...")
-- Send the request and wait for a response
local ok, err = udp:sendto(msearch_request, SSDP_ADDR, SSDP_PORT)
if ok then
-- print("応答を待機中...")
while true do
local data, ip, port = udp:receivefrom()
if data then
local response_text = data
-- print(data)
base_url = extract_base_url(response_text)
if base_url then
-- print("デバイスを発見しました!")
-- print("ベースURL:", base_url)
break -- Exit loop after finding the URL
end
else
if ip == "timeout" then
print("M-SEARCH timeout")
else
print("M-SEARCH error:", ip)
end
break
end
end
else
print("M-SEARCH request error:", err)
end
-- Close the M-SEARCH socket
udp:close()
-- If a base URL was found, proceed with data acquisition
if base_url then
local requests = {
{ action = "GetTotalBytesReceived", tag = "", label = "受信" },
{ action = "GetTotalBytesSent", tag = "", label = "送信" }
}
for _, req_info in ipairs(requests) do
get_upnp_data(base_url, req_info)
end
else
print("SSDP no base url")
end