Files
ServiceNow_XML_attachment_e…/test.py
2025-05-03 06:18:44 +00:00

186 lines
6.1 KiB
Python

import xml.etree.ElementTree as ET
from collections import defaultdict
import base64
import os,csv
import gzip
from io import BytesIO
# Load the XML file
tree = ET.parse('sc_req_item (short_descriptionSTARTSWITHsolution security).xml')
root = tree.getroot()
# Output directory for attachments
os.makedirs("attachments", exist_ok=True)
# Data containers
ritms = {}
tasks = defaultdict(list)
attachments = defaultdict(list)
attachment_docs = defaultdict(list)
# Parse RITMs
for item in root.findall('sc_req_item'):
ritm_number = item.findtext('number')
sys_id = item.findtext('sys_id')
short_desc = item.findtext('short_description')
opened_by = item.find('opened_by').attrib.get('display_value', '')
requested_for = item.find('requested_for').attrib.get('display_value', '')
ritms[sys_id] = {
'ritm_number': ritm_number,
'short_description': short_desc,
'opened_by': opened_by,
'requested_for': requested_for,
}
# Parse Journal Fields
for journal in root.findall('sys_journal_field'):
element_id = journal.findtext('element_id')
element = journal.findtext('element')
value = journal.findtext('value')
if element_id in ritms:
tasks[element_id].append({
'type': element,
'value': value.strip()
})
# Parse Attachment Metadata
attachment_info = {}
for attach in root.findall('sys_attachment'):
sys_id = attach.findtext('sys_id')
table_sys_id = attach.findtext('table_sys_id')
file_name = attach.findtext('file_name')
content_type = attach.findtext('content_type')
size = attach.findtext('size_bytes')
attachment_info[sys_id] = {
'file_name': file_name,
'content_type': content_type,
'size_bytes': size,
'table_sys_id': table_sys_id
}
attachments[table_sys_id].append(sys_id)
# Parse Attachment Data Chunks
for doc in root.findall('sys_attachment_doc'):
# This is the correct way to get the associated attachment ID
attachment_elem = doc.find('sys_attachment')
if attachment_elem is not None:
attachment_sys_id = attachment_elem.attrib.get('sys_id')
data = doc.findtext('data')
if attachment_sys_id and data:
attachment_docs[attachment_sys_id].append(data)
# Save Attachment Files
for attach_sys_id, info in attachment_info.items():
chunks = attachment_docs.get(attach_sys_id, [])
if not chunks:
continue
# Sort chunks by <position>
chunk_entries = [
(int(doc.findtext('position')), doc.findtext('data'))
for doc in root.findall('sys_attachment_doc')
if doc.find('sys_attachment') is not None and
doc.find('sys_attachment').attrib.get('sys_id') == attach_sys_id
]
chunk_entries.sort(key=lambda x: x[0])
# b64_data = ''.join(data for _, data in chunk_entries)
# # Decode base64
# raw_data = base64.b64decode(b64_data)
raw_data = b''.join(base64.b64decode(data) for _, data in chunk_entries)
# Attempt to decompress if it's gzipped
try:
raw_data = gzip.decompress(raw_data)
except OSError:
pass # Not gzipped
# Create subfolder named after the RITM number
ritm_number = ritms[info['table_sys_id']]['ritm_number']
output_dir = os.path.join("attachments", ritm_number)
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, info['file_name'])
with open(output_path, 'wb') as f:
f.write(raw_data)
print(f"Saved attachment: {output_path}")
# Combine everything
ritm_data = []
for sys_id, ritm in ritms.items():
ritm['tasks'] = tasks.get(sys_id, [])
ritm['attachments'] = [attachment_info[a] for a in attachments.get(sys_id, [])]
ritm_data.append(ritm)
# Print results
for r in ritm_data:
print(f"\nRITM: {r['ritm_number']}")
print(f" Description: {r['short_description']}")
print(f" Opened By: {r['opened_by']}")
print(f" Requested For: {r['requested_for']}")
print(" Tasks / Notes:")
for t in r['tasks']:
print(f" - [{t['type']}] {t['value']}")
print(" Attachments:")
for a in r['attachments']:
print(f" - {a['file_name']} ({a['content_type']}, {a['size_bytes']} bytes)")
# Combine everything into RITM data structure
ritm_data = []
for sys_id, ritm in ritms.items():
ritm['tasks'] = tasks.get(sys_id, [])
ritm['attachments'] = [attachment_info[a] for a in attachments.get(sys_id, [])]
ritm_data.append(ritm)
# Write RITMs to CSV
with open('ritms.csv', 'w', newline='') as csvfile:
fieldnames = ['ritm_number', 'short_description', 'opened_by', 'requested_for']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for r in ritm_data:
writer.writerow({
'ritm_number': r['ritm_number'],
'short_description': r['short_description'],
'opened_by': r['opened_by'],
'requested_for': r['requested_for']
})
# Write Attachments to CSV
with open('attachments.csv', 'w', newline='') as csvfile:
fieldnames = ['ritm_number', 'file_name', 'content_type', 'size_bytes']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for r in ritm_data:
for a in r['attachments']:
writer.writerow({
'ritm_number': r['ritm_number'],
'file_name': a['file_name'],
'content_type': a['content_type'],
'size_bytes': a['size_bytes']
})
# Write Tasks to CSV
with open('tasks.csv', 'w', newline='') as csvfile:
fieldnames = ['ritm_number', 'type', 'value']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for r in ritm_data:
for t in r['tasks']:
writer.writerow({
'ritm_number': r['ritm_number'],
'type': t['type'],
'value': t['value']
})
print("CSV files generated successfully.")