9
9
from pathlib import Path
10
10
from uuid import uuid4
11
11
12
- import pymisp
13
-
14
12
from intelmq .lib .bot import OutputBot
15
13
from intelmq .lib .exceptions import MissingDependencyError
16
- from ....lib .message import Message , MessageFactory
14
+ from ....lib .message import MessageFactory
17
15
from intelmq .lib .mixins import CacheMixin
18
16
from intelmq .lib .utils import parse_relative
19
17
20
18
try :
21
- from pymisp import MISPEvent , MISPOrganisation , NewAttributeError
19
+ from pymisp import MISPEvent , MISPObject , MISPOrganisation , NewAttributeError
22
20
from pymisp .tools import feed_meta_generator
23
21
except ImportError :
24
22
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
25
23
MISPEvent = None
26
24
import_fail_reason = "import"
27
25
26
+ DEFAULT_KEY = "default"
27
+
28
28
29
29
class MISPFeedOutputBot (OutputBot , CacheMixin ):
30
30
"""Generate an output in the MISP Feed format"""
@@ -38,6 +38,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin):
38
38
)
39
39
_is_multithreadable : bool = False
40
40
attribute_mapping : dict = None
41
+ event_separator : str = None
41
42
42
43
@staticmethod
43
44
def check_output_dir (dirname ):
@@ -50,7 +51,8 @@ def init(self):
50
51
if MISPEvent is None :
51
52
raise MissingDependencyError ("pymisp" , version = ">=2.4.117.3" )
52
53
53
- self .current_event = None
54
+ self .current_events = {}
55
+ self .current_files = {}
54
56
55
57
self .misp_org = MISPOrganisation ()
56
58
self .misp_org .name = self .misp_org_name
@@ -66,58 +68,57 @@ def init(self):
66
68
minutes = parse_relative (self .interval_event )
67
69
)
68
70
71
+ self .min_time_current = datetime .datetime .max
72
+ self .max_time_current = datetime .datetime .min
73
+
69
74
if (self .output_dir / ".current" ).exists ():
70
75
try :
71
76
with (self .output_dir / ".current" ).open () as f :
72
- self .current_file = Path (f .read ())
73
-
74
- if self .current_file .exists ():
75
- self .current_event = MISPEvent ()
76
- self .current_event .load_file (self .current_file )
77
-
78
- last_min_time , last_max_time = re .findall (
79
- "IntelMQ event (.*) - (.*)" , self .current_event .info
80
- )[0 ]
81
- last_min_time = datetime .datetime .strptime (
82
- last_min_time , "%Y-%m-%dT%H:%M:%S.%f"
83
- )
84
- last_max_time = datetime .datetime .strptime (
85
- last_max_time , "%Y-%m-%dT%H:%M:%S.%f"
86
- )
87
- if last_max_time < datetime .datetime .now ():
88
- self .min_time_current = datetime .datetime .now ()
89
- self .max_time_current = self .min_time_current + self .timedelta
90
- self .current_event = None
91
- else :
92
- self .min_time_current = last_min_time
93
- self .max_time_current = last_max_time
94
- except :
77
+ current = f .read ()
78
+
79
+ if not self .event_separator :
80
+ self .current_files [DEFAULT_KEY ] = Path (current )
81
+ else :
82
+ self .current_files = {
83
+ k : Path (v ) for k , v in json .loads (current ).items ()
84
+ }
85
+
86
+ for key , path in self .current_files .items ():
87
+ self ._load_event (path , key )
88
+ except Exception :
95
89
self .logger .exception (
96
- "Loading current event %s failed. Skipping it." , self .current_event
90
+ "Loading current events %s failed. Skipping it." , self .current_files
97
91
)
98
- self .current_event = None
99
- else :
92
+ self .current_events = {}
93
+
94
+ if not self .current_files or self .max_time_current < datetime .datetime .now ():
100
95
self .min_time_current = datetime .datetime .now ()
101
96
self .max_time_current = self .min_time_current + self .timedelta
97
+ self .current_events = {}
98
+
99
+ def _load_event (self , file_path : Path , key : str ):
100
+ if file_path .exists ():
101
+ self .current_events [key ] = MISPEvent ()
102
+ self .current_events [key ].load_file (file_path )
103
+
104
+ last_min_time , last_max_time = re .findall (
105
+ "IntelMQ event (.*) - (.*)" , self .current_events [key ].info
106
+ )[0 ]
107
+ last_min_time = datetime .datetime .strptime (
108
+ last_min_time , "%Y-%m-%dT%H:%M:%S.%f"
109
+ )
110
+ last_max_time = datetime .datetime .strptime (
111
+ last_max_time , "%Y-%m-%dT%H:%M:%S.%f"
112
+ )
113
+
114
+ self .min_time_current = min (last_min_time , self .min_time_current )
115
+ self .max_time_current = max (last_max_time , self .max_time_current )
102
116
103
117
def process (self ):
104
- if not self . current_event or datetime .datetime .now () > self .max_time_current :
118
+ if datetime .datetime .now () > self .max_time_current :
105
119
self .min_time_current = datetime .datetime .now ()
106
120
self .max_time_current = self .min_time_current + self .timedelta
107
- self .current_event = MISPEvent ()
108
- self .current_event .info = "IntelMQ event {begin} - {end}" "" .format (
109
- begin = self .min_time_current .isoformat (),
110
- end = self .max_time_current .isoformat (),
111
- )
112
- self .current_event .set_date (datetime .date .today ())
113
- self .current_event .Orgc = self .misp_org
114
- self .current_event .uuid = str (uuid4 ())
115
- self .current_file = self .output_dir / f"{ self .current_event .uuid } .json"
116
- with (self .output_dir / ".current" ).open ("w" ) as f :
117
- f .write (str (self .current_file ))
118
-
119
- # On startup or when timeout occurs, clean the queue to ensure we do not
120
- # keep events forever because there was not enough generated
121
+
121
122
self ._generate_feed ()
122
123
123
124
event = self .receive_message ().to_dict (jsondict_as_string = True )
@@ -128,19 +129,57 @@ def process(self):
128
129
129
130
if cache_size is None :
130
131
self ._generate_feed (event )
132
+ elif not self .current_events :
133
+ # Always create the first event so we can keep track of the interval.
134
+ # It also ensures cleaning the queue after startup in case of awaiting
135
+ # messages from the previous run
136
+ self ._generate_feed ()
131
137
elif cache_size >= self .bulk_save_count :
132
138
self ._generate_feed ()
133
139
134
140
self .acknowledge_message ()
135
141
142
+ def _generate_new_event (self , key ):
143
+ self .current_events [key ] = MISPEvent ()
144
+ self .current_events [key ].info = "IntelMQ event {begin} - {end}" "" .format (
145
+ begin = self .min_time_current .isoformat (),
146
+ end = self .max_time_current .isoformat (),
147
+ )
148
+ self .current_events [key ].set_date (datetime .date .today ())
149
+ self .current_events [key ].Orgc = self .misp_org
150
+ self .current_events [key ].uuid = str (uuid4 ())
151
+ self .current_files [key ] = (
152
+ self .output_dir / f"{ self .current_events [key ].uuid } .json"
153
+ )
154
+ with (self .output_dir / ".current" ).open ("w" ) as f :
155
+ if not self .event_separator :
156
+ f .write (str (self .current_files [key ]))
157
+ else :
158
+ json .dump ({k : str (v ) for k , v in self .current_files .items ()}, f )
159
+ return self .current_events [key ]
160
+
136
161
def _add_message_to_feed (self , message : dict ):
137
- obj = self .current_event .add_object (name = "intelmq_event" )
162
+ if not self .event_separator :
163
+ key = DEFAULT_KEY
164
+ else :
165
+ # For proper handling of nested fields
166
+ message_obj = MessageFactory .from_dict (
167
+ message , harmonization = self .harmonization , default_type = "Event"
168
+ )
169
+ key = message_obj .get (self .event_separator ) or DEFAULT_KEY
170
+
171
+ if key in self .current_events :
172
+ event = self .current_events [key ]
173
+ else :
174
+ event = self ._generate_new_event (key )
175
+
176
+ obj = event .add_object (name = "intelmq_event" )
138
177
if not self .attribute_mapping :
139
178
self ._default_mapping (obj , message )
140
179
else :
141
180
self ._custom_mapping (obj , message )
142
181
143
- def _default_mapping (self , obj : pymisp . MISPObject , message : dict ):
182
+ def _default_mapping (self , obj : " MISPObject" , message : dict ):
144
183
for object_relation , value in message .items ():
145
184
try :
146
185
obj .add_attribute (object_relation , value = value )
@@ -162,15 +201,15 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic
162
201
for parameter , value in definition .items ():
163
202
# Check if the value is a harmonization key or a static value
164
203
if isinstance (value , str ) and (
165
- value in self .harmonization ["event" ]
166
- or value .split ("." , 1 )[0 ] in self .harmonization ["event" ]
204
+ value in self .harmonization ["event" ] or
205
+ value .split ("." , 1 )[0 ] in self .harmonization ["event" ]
167
206
):
168
207
result [parameter ] = message .get (value )
169
208
else :
170
209
result [parameter ] = value
171
210
return result
172
211
173
- def _custom_mapping (self , obj : pymisp . MISPObject , message : dict ):
212
+ def _custom_mapping (self , obj : " MISPObject" , message : dict ):
174
213
for object_relation , definition in self .attribute_mapping .items ():
175
214
obj .add_attribute (
176
215
object_relation ,
@@ -188,9 +227,10 @@ def _generate_feed(self, message: dict = None):
188
227
self ._add_message_to_feed (message )
189
228
message = self .cache_pop ()
190
229
191
- feed_output = self .current_event .to_feed (with_meta = False )
192
- with self .current_file .open ("w" ) as f :
193
- json .dump (feed_output , f )
230
+ for key , event in self .current_events .items ():
231
+ feed_output = event .to_feed (with_meta = False )
232
+ with self .current_files [key ].open ("w" ) as f :
233
+ json .dump (feed_output , f )
194
234
195
235
feed_meta_generator (self .output_dir )
196
236
0 commit comments