文件結構與數據分析專項-解析
在https://exam.didctf.com/practice/questions可以找到題目

出這套題主要是想鼓勵大家在遇到陌生的文件時,可以主動地去對這類文件進行分析(尤其是將多個文件打包在一起),希望能通過專項練習得到這方面的提升。
源碼
這邊先給出源碼,先是main.go
package main
import (
"crypto/rand"
"fmt"
)
func main() {
key := make([]byte, 256)
if _, err := rand.Read(key); err != nil {
fmt.Println("Error generating key:", err)
return
}
if err := PackDir("files", "output.pak", key); err != nil {
panic(err)
}
if err := UnpackFile("output.pak", "output"); err != nil {
panic(err)
}
}
然后是packer.go,這里就是主要邏輯
package main
import (
"bytes"
"compress/gzip"
"crypto/rc4"
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"unsafe"
)
type Packer struct {
Header [4]byte
FileCount uint32
RC4Key [256]byte
}
type FileInfo struct {
CreateTime uint64
ModifyTime uint64
FileSize uint64
GzipSize uint64
FileNameLen uint16
FileName string
FileData []byte
}
var pool = &sync.Pool{
New: func() interface{} {
return &syscall.Filetime{}
},
}
func uint64FromFiletime(filetime *syscall.Filetime) uint64 {
result := *(*uint64)(unsafe.Pointer(filetime))
return result
}
func Timestamp(t time.Time) uint64 {
filetime := pool.Get().(*syscall.Filetime)
defer pool.Put(filetime)
*filetime = syscall.NsecToFiletime(t.UnixNano())
return uint64FromFiletime(filetime)
}
func FileTime(t syscall.Filetime) uint64 {
return uint64FromFiletime(&t)
}
func GzipCompress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write(data)
if err != nil {
return nil, err
}
w.Close()
return buf.Bytes(), nil
}
func GzipDecompress(data []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
func Rc4Encrypt(key, data []byte) []byte {
dst := make([]byte, len(data))
c, _ := rc4.NewCipher(key)
c.XORKeyStream(dst, data)
return dst
}
func DeriveNextKey(prevData, baseKey []byte) []byte {
if len(prevData) >= 256 {
return prevData[len(prevData)-256:]
}
need := 256 - len(prevData)
newKey := append([]byte{}, prevData...)
newKey = append(newKey, baseKey[:need]...)
return newKey
}
func PackDir(dir, outFile string, baseKey []byte) error {
entries, err := os.ReadDir(dir)
if err != nil {
return err
}
var packer Packer
copy(packer.Header[:], []byte("PACK"))
copy(packer.RC4Key[:], baseKey[:256])
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, &packer); err != nil {
return err
}
curKey := packer.RC4Key[:]
fileCount := uint32(0)
for _, entry := range entries {
if entry.IsDir() {
continue
}
path := filepath.Join(dir, entry.Name())
info, err := os.Stat(path)
if err != nil {
return err
}
data, err := os.ReadFile(path)
if err != nil {
return err
}
gz, err := GzipCompress(data)
if err != nil {
return err
}
enc := Rc4Encrypt(curKey, gz)
var ctime, mtime time.Time
if stat, ok := info.Sys().(*syscall.Win32FileAttributeData); ok {
ctime = time.Unix(0, stat.CreationTime.Nanoseconds())
mtime = time.Unix(0, stat.LastWriteTime.Nanoseconds())
} else {
ctime = info.ModTime()
mtime = info.ModTime()
}
fi := FileInfo{
CreateTime: Timestamp(ctime),
ModifyTime: Timestamp(mtime),
FileSize: uint64(info.Size()),
GzipSize: uint64(len(enc)),
FileNameLen: uint16(len(entry.Name())),
FileName: entry.Name(),
FileData: enc,
}
if err := binary.Write(buf, binary.LittleEndian, fi.CreateTime); err != nil {
return err
}
if err := binary.Write(buf, binary.LittleEndian, fi.ModifyTime); err != nil {
return err
}
if err := binary.Write(buf, binary.LittleEndian, fi.FileSize); err != nil {
return err
}
if err := binary.Write(buf, binary.LittleEndian, fi.GzipSize); err != nil {
return err
}
if err := binary.Write(buf, binary.LittleEndian, fi.FileNameLen); err != nil {
return err
}
if _, err := buf.Write([]byte(fi.FileName)); err != nil {
return err
}
if _, err := buf.Write(fi.FileData); err != nil {
return err
}
fileCount++
curKey = DeriveNextKey(gz, packer.RC4Key[:])
}
packer.FileCount = fileCount
out := buf.Bytes()
binary.LittleEndian.PutUint32(out[4:8], fileCount)
return os.WriteFile(outFile, out, 0644)
}
func UnpackFile(packFile, outDir string) error {
data, err := os.ReadFile(packFile)
if err != nil {
return err
}
buf := bytes.NewReader(data)
var p Packer
if err := binary.Read(buf, binary.LittleEndian, &p); err != nil {
return err
}
curKey := p.RC4Key[:]
for i := uint32(0); i < p.FileCount; i++ {
var fi FileInfo
if err := binary.Read(buf, binary.LittleEndian, &fi.CreateTime); err != nil {
return err
}
if err := binary.Read(buf, binary.LittleEndian, &fi.ModifyTime); err != nil {
return err
}
if err := binary.Read(buf, binary.LittleEndian, &fi.FileSize); err != nil {
return err
}
if err := binary.Read(buf, binary.LittleEndian, &fi.GzipSize); err != nil {
return err
}
if err := binary.Read(buf, binary.LittleEndian, &fi.FileNameLen); err != nil {
return err
}
name := make([]byte, fi.FileNameLen)
if _, err := io.ReadFull(buf, name); err != nil {
return err
}
fi.FileName = string(name)
enc := make([]byte, fi.GzipSize)
if _, err := io.ReadFull(buf, enc); err != nil {
return err
}
dec := Rc4Encrypt(curKey, enc)
raw, err := GzipDecompress(dec)
if err != nil {
return err
}
outPath := filepath.Join(outDir, fi.FileName)
if err := os.WriteFile(outPath, raw, 0644); err != nil {
return err
}
fmt.Println("Unpacked:", outPath)
curKey = DeriveNextKey(dec, p.RC4Key[:])
}
return nil
}
出題思路
先準備若干文件,這里模擬的是一個傳銷平臺場景,數據用python生成,插入到數據庫(純粹是為了解答案方便)當中,然后導出作為待打包文件,包含數據文件和表結構文件。
設定的打包文件邏輯是,在文件中存儲一些元數據,包括文件數量、創建時間、修改時間、原始大小、壓縮后大小、文件名稱。然后對原始數據進行壓縮,并使用rc4加密,初始密鑰隨機,后續文件密鑰使用上一個文件壓縮結果的后256字節。
這樣下來如果想要解包,則需要先找到初始密鑰,然后解密第一個文件,得到壓縮后的數據,再用最后256字節繼續向下解密,如此往復。
為了簡便,只實現了打包1個目錄下的文件,沒有做遞歸這些,編譯時注釋掉解包代碼。
import random
import pymysql
from faker import Faker
from datetime import datetime, timedelta
# 數據庫連接配置
DB_CONFIG = {
"host": "192.168.31.5",
"user": "root",
"password": "123456",
"database": "test",
"charset": "utf8mb4"
}
faker = Faker("zh_CN") # 生成中文數據
# 插入會員數據
def insert_members(cursor, total_members=100):
members = []
start_time = datetime(2008, 1, 1, 0, 0, 0)
for i in range(1, total_members + 1):
nickname = faker.user_name()
gender = random.choice([0, 1, 2])
real_name = faker.name()
mobile = faker.phone_number()
id_card = faker.ssn()
address = faker.address()
bank_card = faker.credit_card_number()
# 創建時間隨機遞增 1~5 小時
if i == 1:
create_time = start_time
else:
delta_hours = random.randint(1, 5)
create_time = members[-1]["create_time"] + timedelta(hours=delta_hours)
# 上級會員邏輯
if i == 1:
superior_id = None
else:
candidate_size = max(5, int((i - 1) * 0.5))
start_id = max(1, i - candidate_size)
superior_id = random.randint(start_id, i - 1)
member = {
"member_id": i,
"nickname": nickname,
"gender": gender,
"real_name": real_name,
"mobile": mobile,
"id_card": id_card,
"address": address,
"bank_card": bank_card,
"superior_id": superior_id,
"wallet_balance": 0.00,
"create_time": create_time
}
members.append(member)
cursor.execute("""
INSERT INTO member (member_id, nickname, gender, real_name, mobile, id_card, address, bank_card, superior_id, wallet_balance, create_time)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
member["member_id"],
member["nickname"],
member["gender"],
member["real_name"],
member["mobile"],
member["id_card"],
member["address"],
member["bank_card"],
member["superior_id"],
member["wallet_balance"],
member["create_time"]
))
return members
# 插入流水數據
def insert_transaction_flows(cursor, members, total_flows=300):
flows = []
flow_id = 1
members_with_flows = random.sample(members, k=len(members) // 2)
for _ in range(total_flows):
member = random.choice(members)
if member not in members_with_flows and random.random() < 0.5:
member = random.choice(members_with_flows)
member_id = member["member_id"]
wallet_balance = member["wallet_balance"]
# 流水時間遞增 1~200 分鐘
if flows:
last_time = flows[-1]["create_time"]
else:
last_time = member["create_time"]
delta_minutes = random.randint(1, 10)
create_time = last_time + timedelta(seconds=delta_minutes)
amount = random.randint(100, 1000)
if wallet_balance <= 0:
flow_type = 1
else:
flow_type = random.choice([1, 2])
if flow_type == 1:
new_balance = wallet_balance + amount
else:
new_balance = wallet_balance - amount
if new_balance < 0:
flow_type = 1
new_balance = wallet_balance + amount
member["wallet_balance"] = new_balance
flow = {
"flow_id": flow_id,
"member_id": member_id,
"flow_type": flow_type,
"amount": amount,
"create_time": create_time
}
flows.append(flow)
cursor.execute("""
INSERT INTO transaction_flow (flow_id, member_id, flow_type, amount, create_time)
VALUES (%s,%s,%s,%s,%s)
""", (
flow["flow_id"],
flow["member_id"],
flow["flow_type"],
flow["amount"],
flow["create_time"]
))
flow_id += 1
def main():
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
print("插入會員數據中...")
members = insert_members(cursor, total_members=3500000)
conn.commit()
# print("插入流水數據中...")
# insert_transaction_flows(cursor, members, total_flows=456789)
# conn.commit()
print("數據插入完成!")
except Exception as e:
conn.rollback()
print("發生錯誤:", e)
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()
解題過程
首先的2題,讓查看packer.exe的信息,此時一般就會使用DIE、EXEInfoPe這類工具,再看后面的問題,應當養成習慣,先用IDA加載了再說。
IDA9.2之前,對go的字符串支持幾乎為0,可以用BinaryNinja替代,9.2開始可以正常反編譯出字符串(如果能看到字符串,則會有比較大的突破)
1、2跳過,來到第3題
文件output.pak是由packer.exe生成的文件,該文件中包含了幾個文件?(答案格式:0)
先來看反編譯結果,可以清晰地看到程序可能使用了rc4、gzip、timestamp這幾樣東西,并且打包函數是main_PackDir

這里可以看到v61,他聲明的長度是264,為什么是這個數?并且還有字符串PACK在,這占了4個字節,后面的從第9個字節開始的256個字節從a5中復制過來,這是之前main中的v3,通過crypto_rand_Read生成了256字節的隨機密鑰

也就是說,目前文件頭部的結構是PACK+4字節+256字節隨機密鑰,那中間的4字節是什么呢?結合提問其實就可以猜測,它就是記錄的文件數量,我們帶著猜測看一下他的賦值
這里可以看到v15由v60+1得來,從宏觀看這部分的代碼,其實就是在循環中記錄+1,這里就可以確定結果了

此時查看文件內容,就可以知道打包了4個文件

此外,在ida解析出的LocalTypes中可以看到這個結構體(不知道為啥另一個結構體看不到),可以很輕松得到結果

到這里,3、4、5、6、7都可以解出來了,密鑰是
文件output.pak是由packer.exe生成的文件,該文件中包含了幾個文件?(答案格式:0)
文件output.pak中存在一個密鑰,該密鑰的長度是多少字節?(答案格式:0)
給出密鑰的MD5值。(答案格式:e10adc3949ba59abbe56e057f20f883e)
output.pak中包含的文件使用的加密算法是什么?(答案格式:des)
output.pak中包含的文件使用的壓縮算法是什么?(答案格式:zip)
4F7C6DA92323508E512FC92952F0D4552B164894107FC252621391F9978F61EB1A3D1915BFFE70931D326F9972721ACEEB813B7C6D3E4CA29699DCEF2171CAC043675C7B5F61A98F5CDE6439435CFF60EC76915C7E0DFFDAE9EB89596DC6A5B8B2E0DF61E415E78AC1C1BBF6F056EC4E74C15891DFC942EB732832022651ADC60EA139C993733C17C19D60137375C363E9693B7E0E04BCBAAEF89D14D70D752A8DF0525A6D3C9A78E583774DEA272B57038401BA9C27F54DBB8585FEDA71DF4A46D036AB1795ED75BA866189F57D130B8A9891515F2EC7659E956258F3FCCED8D6A741D6F80109A140B69550DF7650FC51DA590E96EAA2F82222B90F4E16AC09
接下來是8、9、10
member.txt的原始大小是多少字節?(答案格式:10086)
member.txt被壓縮后的大小是多少字節?(答案格式:10086)
創建時間為北京時間2025-09-09 11:59:38的文件的文件名是什么?(答案格式:abc123.def)
從反編譯的結果來看,就是循環讀取文件、壓縮、加密

然后一步步寫進文件

我們側重一下看寫入的是啥,其實主要是跟v94有關,這里先連著寫了4個8字節的uint64數據

接著寫了1個2字節的uint16數據,然后寫入了1個字符串v119(go中的字符串會包含地址和長度),最后將剩下的數據寫入,最后面的v94[7],向上跟一下就知道是加密結果


最先寫入的v82和v80,顯然能看到是時間戳,不難看出獲取的是WindowsFileTime


那么我們回到打包后的文件確認一下,確實如此,現在需要找到的是兩個時間分別表示什么,從下面的數據來看,第二個時間是大于第一個時間的,雖然在秒級上是一樣的,但是FileTime是100納秒級,還是能看出差別


結合題目會問的創建時間,可以推測其中一個是創建時間,那么另一個就是修改時間或者訪問時間,但是從代碼里看簡直就是災難

這里我們可以回到main中,能夠知道程序會讀取files目錄,然后生成output.pak文件,那么為何不執行一次呢?

新建一個files目錄,導入1個文件,然后執行,對比結果。這樣就可以知道,前面一個是創建時間,后面一個是修改時間(到底是修改時間還是訪問時間,保存文件后,過一會再打開,就可以使這倆個值不一樣了,比對一下就能知道)

還記得程序是連續寫入了4個8字節的uint64,1個2字節的uint16嗎?現在我們可以比對了,前面2個是時間,那后面2個呢?從題目的提問來看,大概率是文件大小相關的內容。
我們其實已經可以看見,第3個uint64的值是4,對應了文件大小4字節,第4個uint64的值是28,根據提問,這就對應了壓縮后的字節大?。ㄓ捎趬嚎s算法需要包含一些元數據,特別小的文件壓縮后可能會變大),又由于我們使用的加密算法是RC4,不會影響數據長度,所以獲得密鑰后就可以直接解析了。
接下來的uint16和字符串就很清晰了,uint16的值是7,字符串是aaa.txt,顯然分別對應文件名長度和文件名稱
從反編譯的結果來看,就是讀取了目錄下的所有文件,然后遍歷這些文件進行處理,最后讀取了創建時間、修改時間、文件大小、文件名這些數據并寫入結構體

接下來是11、12兩題
packer.exe在進行一次打包時,使用的密鑰是否會發生變化?(答案格式:是或否)
member.txt的MD5值是多少?(答案格式:e10adc3949ba59abbe56e057f20f883e)
第11題顯然提醒我們,在單詞打包中,密鑰可能會發生變化,同時,這里也降低了難度,member.txt是打包的第一個文件,他的密鑰是存儲在文件頭部的,所以無須寫出完整的unpacker也能拿到這個文件并做一些題目
我們之前分析過,v61后面就是密鑰


這里v116是壓縮結果,v98就是對應的密鑰,那么這里不難發現,每次循環,v98都是由v14進行賦值

根據v71進行判斷,分成小于256和小于等于256兩種情況


顯然用后者更方便,else里面只有3行代碼,v93就是壓縮結果,v71是長度,這一段意思就是取最后256字節

而else之前,可以分析出,獲取256-壓縮長度,將壓縮結果和初始密鑰對應長度的數據拼接得到新的密鑰

這樣一來,我們就可以實現解包函數了,具體邏輯為
1.獲取文件數量,確認要解包的文件數
2.讀取初始密鑰
3.讀取元數據(創建時間、修改時間、文件大小、文件名等)
4.依據壓縮結果長度,從文件名之后讀取對應長度的數據,用初始密鑰進行解密、解壓
5.后續如此循環往復,但密鑰需要根據之前的解密結果生成,數據大于等于256字節時,直接取最后256字節為密鑰,否則和初始密鑰拼接,取前256字節
后續數據分析部分,直接給sql
當前余額和流水不符的會員ID有?(答案格式:12,13,14)
層級關系一共有多少層?(答案格式:1)
第100層有多少會員?(答案格式:1)
性別和身份證號碼能對應上的會員數量?(答案格式:1)
計算流水不符的情況
select m.member_id,m.wallet_balance,a.balance from member m
left join (select tf.member_id,sum(
case tf.flow_type
when 1 then tf.amount
when 2 then -tf.amount
end
) as balance from transaction_flow tf group by tf.member_id) a on m.member_id = a.member_id
where m.wallet_balance <> a.balance;
層級部分直接用levelTree或其他工具,能算出結果即可
校驗性別
select
member_id,
gender,
case SUBSTRING(id_card, LENGTH(id_card) - 1, 1) % 2
when 1 then 1
when 0 then 2
end AS id_gender
from
member
having
id_gender = gender

浙公網安備 33010602011771號