1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
| from neo4j import GraphDatabase from typing import List, Dict, Optional import asyncio from datetime import datetime, timedelta import json
class EnterpriseGraphService: def __init__(self, uri: str, user: str, password: str): self.driver = GraphDatabase.driver(uri, auth=(user, password)) def close(self): self.driver.close() async def bulk_import_social_network(self, users_data: List[Dict], relationships_data: List[Dict]): """批量导入社交网络数据""" def create_users_transaction(tx, batch_data): query = """ UNWIND $batch as user MERGE (u:User {id: user.id}) SET u.name = user.name, u.email = user.email, u.age = user.age, u.interests = user.interests, u.joinDate = datetime(user.joinDate) RETURN COUNT(u) as created """ result = tx.run(query, batch=batch_data) return result.single()["created"] def create_relationships_transaction(tx, batch_data): query = """ UNWIND $batch as rel MATCH (u1:User {id: rel.from_id}) MATCH (u2:User {id: rel.to_id}) MERGE (u1)-[r:FOLLOWS]->(u2) SET r.timestamp = datetime(rel.timestamp), r.strength = rel.strength RETURN COUNT(r) as created """ result = tx.run(query, batch=batch_data) return result.single()["created"] with self.driver.session() as session: batch_size = 1000 for i in range(0, len(users_data), batch_size): batch = users_data[i:i + batch_size] users_created = session.execute_write( create_users_transaction, batch ) print(f"Created {users_created} users in batch {i // batch_size + 1}") for i in range(0, len(relationships_data), batch_size): batch = relationships_data[i:i + batch_size] rels_created = session.execute_write( create_relationships_transaction, batch ) print(f"Created {rels_created} relationships in batch {i // batch_size + 1}") def get_advanced_recommendations(self, user_id: str, limit: int = 10) -> List[Dict]: """获取高级推荐结果""" def recommendation_transaction(tx, uid, lmt): query = """ MATCH (target:User {id: $user_id}) // 基于共同好友的推荐 MATCH (target)-[:FOLLOWS]->(friend)-[:FOLLOWS]->(recommendation:User) WHERE NOT (target)-[:FOLLOWS]->(recommendation) AND recommendation <> target WITH recommendation, COUNT(friend) as mutualFriends, // 计算兴趣相似度 SIZE([interest IN target.interests WHERE interest IN recommendation.interests]) as commonInterests // 获取推荐用户的活跃度 OPTIONAL MATCH (recommendation)-[:POSTED]->(post:Post) WHERE post.timestamp >= datetime() - duration({days: 30}) WITH recommendation, mutualFriends, commonInterests, COUNT(post) as recentPosts, // 计算综合推荐分数 (mutualFriends * 2 + commonInterests * 3 + COUNT(post) * 0.1) as score WHERE score > 1 // 过滤低分推荐 RETURN recommendation.id as userId, recommendation.name as name, recommendation.interests as interests, mutualFriends, commonInterests, recentPosts, score ORDER BY score DESC LIMIT $limit """ result = tx.run(query, user_id=uid, limit=lmt) return [record.data() for record in result] with self.driver.session() as session: return session.execute_read( recommendation_transaction, user_id, limit ) def detect_fraud_patterns(self, account_id: str, hours_back: int = 24) -> Dict: """检测欺诈模式""" def fraud_detection_transaction(tx, acc_id, hours): query = """ MATCH (account:Account {id: $account_id}) // 分析最近交易行为 MATCH (account)-[txn:TRANSACTION]->(target:Account) WHERE txn.timestamp >= datetime() - duration({hours: $hours}) WITH account, COUNT(txn) as transactionCount, SUM(txn.amount) as totalAmount, AVG(txn.amount) as avgAmount, COLLECT(DISTINCT target.id) as uniqueTargets, COLLECT(txn.amount) as amounts // 计算统计指标 WITH account, transactionCount, totalAmount, avgAmount, uniqueTargets, // 计算标准差 SQRT(REDUCE(s = 0.0, x IN amounts | s + (x - avgAmount)^2) / SIZE(amounts)) as stdDev // 检查关联风险账户 OPTIONAL MATCH (account)-[:ASSOCIATED_WITH*1..2]-(riskAccount:Account) WHERE riskAccount.status IN ['SUSPENDED', 'FLAGGED', 'FROZEN'] WITH account, transactionCount, totalAmount, avgAmount, uniqueTargets, stdDev, COUNT(riskAccount) as riskAssociations // 计算风险分数 WITH account, transactionCount, totalAmount, avgAmount, SIZE(uniqueTargets) as targetCount, stdDev, riskAssociations, // 复合风险评分算法 CASE WHEN transactionCount > 100 THEN 5 WHEN transactionCount > 50 THEN 3 ELSE 1 END + CASE WHEN SIZE(uniqueTargets) > 30 THEN 4 WHEN SIZE(uniqueTargets) > 15 THEN 2 ELSE 0 END + CASE WHEN totalAmount > 50000 THEN 3 WHEN totalAmount > 20000 THEN 2 ELSE 0 END + riskAssociations * 2 as riskScore RETURN account.id as accountId, transactionCount, totalAmount, targetCount, riskAssociations, riskScore, CASE WHEN riskScore >= 10 THEN 'HIGH' WHEN riskScore >= 6 THEN 'MEDIUM' WHEN riskScore >= 3 THEN 'LOW' ELSE 'MINIMAL' END as riskLevel """ result = tx.run(query, account_id=acc_id, hours=hours) return result.single().data() with self.driver.session() as session: return session.execute_read( fraud_detection_transaction, account_id, hours_back )
async def main(): graph_service = EnterpriseGraphService( "bolt://localhost:7687", "neo4j", "password" ) try: recommendations = graph_service.get_advanced_recommendations( "user_12345", limit=5 ) print("推荐结果:", json.dumps(recommendations, indent=2)) fraud_analysis = graph_service.detect_fraud_patterns( "account_67890", hours_back=48 ) print("欺诈分析:", json.dumps(fraud_analysis, indent=2)) finally: graph_service.close()
if __name__ == "__main__": asyncio.run(main())
|